You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/23 06:59:51 UTC

[incubator-inlong] branch master updated: [INLONG-4240][Manager] Support Postgres source and sink (#4241)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e8ef88b7 [INLONG-4240][Manager] Support Postgres source and sink (#4241)
7e8ef88b7 is described below

commit 7e8ef88b7571af5fc1826981172634ae32b31e21
Author: baomingyu <ba...@163.com>
AuthorDate: Mon May 23 01:59:46 2022 -0500

    [INLONG-4240][Manager] Support Postgres source and sink (#4241)
---
 .../apache/inlong/common/enums/TaskTypeEnum.java   |   4 +-
 .../manager/client/api/sink/PostgresSink.java      |  71 +++++++
 .../manager/client/api/source/PostgresSource.java  |  76 +++++++
 .../manager/client/api/util/InlongParser.java      |  17 ++
 .../client/api/util/InlongStreamSinkTransfer.java  |  76 +++++++
 .../api/util/InlongStreamSourceTransfer.java       |  52 +++++
 .../inlong/manager/common/enums/SinkType.java      |   3 +-
 .../inlong/manager/common/enums/SourceType.java    |   4 +-
 .../common/pojo/sink/postgres/PostgresSinkDTO.java |  93 +++++++++
 .../sink/postgres/PostgresSinkListResponse.java    |  51 +++++
 .../pojo/sink/postgres/PostgresSinkRequest.java    |  56 +++++
 .../pojo/sink/postgres/PostgresSinkResponse.java   |  58 ++++++
 .../pojo/source/postgres/PostgresSourceDTO.java    |  96 +++++++++
 .../postgres/PostgresSourceListResponse.java       |  66 ++++++
 .../source/postgres/PostgresSourceRequest.java     |  71 +++++++
 .../source/postgres/PostgresSourceResponse.java    |  63 ++++++
 .../sink/postgres/PostgresSinkOperation.java       | 229 +++++++++++++++++++++
 .../service/sort/util/ExtractNodeUtils.java        |  25 +++
 .../manager/service/sort/util/LoadNodeUtils.java   |  27 +++
 .../source/postgres/PostgresSourceOperation.java   | 100 +++++++++
 .../core/sink/PostgresStreamSinkServiceTest.java   | 102 +++++++++
 21 files changed, 1337 insertions(+), 3 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index 6d12377d8..6cb11d120 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -20,7 +20,7 @@ package org.apache.inlong.common.enums;
 import static java.util.Objects.requireNonNull;
 
 public enum TaskTypeEnum {
-    DATABASE_MIGRATION(0),SQL(1), BINLOG(2), FILE(3), KAFKA(4), PULSAR(5);
+    DATABASE_MIGRATION(0),SQL(1), BINLOG(2), FILE(3), KAFKA(4), PULSAR(5), POSTGRES(6);
 
     private int type;
 
@@ -43,6 +43,8 @@ public enum TaskTypeEnum {
                 return KAFKA;
             case 5:
                 return PULSAR;
+            case 6:
+                return POSTGRES;
             default:
                 throw new RuntimeException("such task type doesn't exist");
         }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/PostgresSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/PostgresSink.java
new file mode 100644
index 000000000..90e1b8574
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/PostgresSink.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.sink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.enums.DataFormat;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.stream.SinkField;
+import org.apache.inlong.manager.common.pojo.stream.StreamSink;
+
+/**
+ * Postgres sink.
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Postgres sink configuration")
+public class PostgresSink extends StreamSink {
+
+    @ApiModelProperty(value = "Sink type", required = true)
+    private SinkType sinkType = SinkType.POSTGRES;
+
+    @ApiModelProperty("Postgres JDBC URL")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Authentication for postgres")
+    private DefaultAuthentication authentication;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Field definitions for postgres")
+    private List<SinkField> sinkFields;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
+    /**
+     * get data format
+     * @return just NONE
+     */
+    public DataFormat getDataFormat() {
+        return DataFormat.NONE;
+    }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java
new file mode 100644
index 000000000..c55b0cb65
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.source;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.DataFormat;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
+
+/**
+ * Postgres source.
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base configuration for Postgres collection")
+public class PostgresSource extends StreamSource {
+
+    @ApiModelProperty(value = "DataSource type", required = true)
+    private SourceType sourceType = SourceType.POSTGRES;
+
+    @ApiModelProperty("SyncType")
+    private SyncType syncType = SyncType.INCREMENT;
+
+    @ApiModelProperty("Data format type")
+    private DataFormat dataFormat = DataFormat.NONE;
+
+    @ApiModelProperty("Db server username")
+    private String username;
+
+    @ApiModelProperty("Db password")
+    private String password;
+
+    @ApiModelProperty("DB Server hostname")
+    private String hostname;
+
+    @ApiModelProperty("DB Server port")
+    private int port;
+
+    @ApiModelProperty("Database name")
+    private String dbName;
+
+    @ApiModelProperty("schema info")
+    private String schema;
+
+    @ApiModelProperty("Data table name list")
+    private List<String> tableNameList;
+
+    @ApiModelProperty("decoding pulgin name")
+    private String decodingPluginName;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 2b6e8742f..0b8eba701 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -41,6 +41,7 @@ import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceListResponse;
@@ -52,6 +53,8 @@ import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
@@ -169,6 +172,11 @@ public class InlongParser {
                                 AutoPushSourceRequest.class);
                         sourceResponses.add(autoPushSourceResponse);
                         break;
+                    case POSTGRES:
+                        PostgresSourceResponse postgresSourceResponse = GsonUtil.fromJson(sourceJson.toString(),
+                                PostgresSourceResponse.class);
+                        sourceResponses.add(postgresSourceResponse);
+                        break;
                     default:
                         throw new RuntimeException(String.format("Unsupported sourceType=%s for Inlong", sourceType));
                 }
@@ -203,6 +211,11 @@ public class InlongParser {
                                 ClickHouseSinkResponse.class);
                         sinkResponses.add(clickHouseSinkResponse);
                         break;
+                    case POSTGRES:
+                        PostgresSinkResponse postgresSinkResponse = GsonUtil.fromJson(sinkJson.toString(),
+                                PostgresSinkResponse.class);
+                        sinkResponses.add(postgresSinkResponse);
+                        break;
                     default:
                         throw new RuntimeException(String.format("Unsupported sinkType=%s for Inlong", sinkType));
                 }
@@ -240,6 +253,10 @@ public class InlongParser {
                     return GsonUtil.fromJson(pageInfoJson,
                             new TypeToken<PageInfo<AutoPushSourceListResponse>>() {
                             }.getType());
+                case POSTGRES:
+                    return GsonUtil.fromJson(pageInfoJson,
+                            new TypeToken<PageInfo<PostgresSourceListResponse>>() {
+                            }.getType());
                 default:
                     throw new IllegalArgumentException(
                             String.format("Unsupported sourceType=%s for Inlong", sourceType));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 1754dd970..21b543d50 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.client.api.sink.HbaseSink;
 import org.apache.inlong.manager.client.api.sink.HiveSink;
 import org.apache.inlong.manager.client.api.sink.KafkaSink;
 import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.client.api.sink.PostgresSink;
 import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.FileFormat;
@@ -43,6 +44,8 @@ import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.SinkField;
 import org.apache.inlong.manager.common.pojo.stream.StreamSink;
@@ -69,6 +72,8 @@ public class InlongStreamSinkTransfer {
                 return createClickHouseRequest(streamSink, streamInfo);
             case HBASE:
                 return createHbaseRequest(streamSink, streamInfo);
+            case POSTGRES:
+                return createPostgresRequest(streamSink, streamInfo);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported sink type : %s for Inlong", sinkType));
         }
@@ -93,6 +98,8 @@ public class InlongStreamSinkTransfer {
                 return parseClickHouseSink((ClickHouseSinkResponse) sinkResponse, streamSink);
             case HBASE:
                 return parseHbaseSink((HbaseSinkResponse) sinkResponse, streamSink);
+            case POSTGRES:
+                return parsePostgresSink((PostgresSinkResponse) sinkResponse, streamSink);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported sink type : %s for Inlong", sinkType));
         }
@@ -432,4 +439,73 @@ public class InlongStreamSinkTransfer {
         return hiveSink;
     }
 
+    /**
+     * Create postgres request
+     *
+     * @param streamSink streamSink
+     * @param streamInfo streamInfo
+     * @return postgres sinkRequest
+     */
+    private static SinkRequest createPostgresRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
+        PostgresSinkRequest postgresSinkRequest = new PostgresSinkRequest();
+        PostgresSink postgresSink = (PostgresSink) streamSink;
+        postgresSinkRequest.setJdbcUrl(postgresSink.getJdbcUrl());
+        postgresSinkRequest.setUsername(postgresSink.getAuthentication().getUserName());
+        postgresSinkRequest.setPassword(postgresSink.getAuthentication().getPassword());
+        postgresSinkRequest.setDbName(postgresSink.getDbName());
+        postgresSinkRequest.setTableName(postgresSink.getTableName());
+
+        postgresSinkRequest.setSinkName(postgresSink.getSinkName());
+        postgresSinkRequest.setSinkType(postgresSink.getSinkType().name());
+        postgresSinkRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+        postgresSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+
+        postgresSinkRequest.setProperties(postgresSink.getProperties());
+        postgresSinkRequest.setPrimaryKey(postgresSink.getPrimaryKey());
+
+        if (CollectionUtils.isNotEmpty(postgresSink.getSinkFields())) {
+            List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(postgresSink.getSinkFields());
+            postgresSinkRequest.setFieldList(fieldRequests);
+        }
+        return postgresSinkRequest;
+    }
+
+    /**
+     * parse postgres sink
+     * 
+     * @param sinkResponse sinkResponse
+     * @param sink sink
+     * @return postgres streamSink
+     */
+    private static StreamSink parsePostgresSink(PostgresSinkResponse sinkResponse, StreamSink sink) {
+        PostgresSink postgresSink = new PostgresSink();
+        if (sink != null) {
+            AssertUtils.isTrue(sinkResponse.getSinkName().equals(sink.getSinkName()),
+                    String.format("SinkName is not equal: %s != %s", sinkResponse, sink));
+            PostgresSink snapshot = (PostgresSink) sink;
+
+            postgresSink.setSinkName(snapshot.getSinkName());
+            postgresSink.setAuthentication(snapshot.getAuthentication());
+
+            postgresSink.setJdbcUrl(snapshot.getJdbcUrl());
+            postgresSink.setTableName(snapshot.getTableName());
+            postgresSink.setDbName(snapshot.getDbName());
+        } else {
+            postgresSink.setSinkName(sinkResponse.getSinkName());
+            String password = sinkResponse.getPassword();
+            String uname = sinkResponse.getUsername();
+            postgresSink.setAuthentication(new DefaultAuthentication(uname, password));
+
+            postgresSink.setJdbcUrl(sinkResponse.getJdbcUrl());
+            postgresSink.setTableName(sinkResponse.getTableName());
+            postgresSink.setDbName(sinkResponse.getDbName());
+        }
+        postgresSink.setPrimaryKey(sinkResponse.getPrimaryKey());
+        postgresSink.setProperties(sinkResponse.getProperties());
+        postgresSink.setSinkType(SinkType.POSTGRES);
+        if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
+            postgresSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
+        }
+        return postgresSink;
+    }
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 62cf20067..c051d671b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.client.api.source.AgentFileSource;
 import org.apache.inlong.manager.client.api.source.AutoPushSource;
 import org.apache.inlong.manager.client.api.source.KafkaSource;
 import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.client.api.source.PostgresSource;
 import org.apache.inlong.manager.common.enums.DataFormat;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -37,6 +38,8 @@ import org.apache.inlong.manager.common.pojo.source.file.FileSourceRequest;
 import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.stream.StreamSource;
 import org.apache.inlong.manager.common.pojo.stream.StreamSource.State;
@@ -60,6 +63,8 @@ public class InlongStreamSourceTransfer {
                 return createFileSourceRequest((AgentFileSource) streamSource, streamInfo);
             case AUTO_PUSH:
                 return createAutoPushSourceRequest((AutoPushSource) streamSource, streamInfo);
+            case POSTGRES:
+                return createPostgresSourceRequest((PostgresSource) streamSource, streamInfo);
             default:
                 throw new RuntimeException(String.format("Unsupported source=%s for Inlong", sourceType));
         }
@@ -80,6 +85,9 @@ public class InlongStreamSourceTransfer {
         if (sourceType == SourceType.AUTO_PUSH && sourceResponse instanceof AutoPushSourceResponse) {
             return parseAutoPushSource((AutoPushSourceResponse) sourceResponse);
         }
+        if (sourceType == SourceType.POSTGRES && sourceResponse instanceof AutoPushSourceResponse) {
+            return parsePostgresSource((PostgresSourceResponse) sourceResponse);
+        }
         throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
     }
 
@@ -158,6 +166,26 @@ public class InlongStreamSourceTransfer {
         return autoPushSource;
     }
 
+    private static PostgresSource parsePostgresSource(PostgresSourceResponse response) {
+        PostgresSource postgresSource = new PostgresSource();
+        postgresSource.setSourceName(response.getSourceName());
+        postgresSource.setState(State.parseByStatus(response.getStatus()));
+        postgresSource.setDataFormat(DataFormat.NONE);
+        postgresSource.setFields(InlongStreamTransfer.parseStreamFields(response.getFieldList()));
+
+        postgresSource.setDbName(response.getDatabase());
+        postgresSource.setDecodingPluginName(response.getDecodingPluginName());
+        postgresSource.setHostname(response.getHostname());
+        postgresSource.setPassword(response.getPassword());
+        postgresSource.setPort(response.getPort());
+        postgresSource.setSchema(response.getSchema());
+        postgresSource.setTableNameList(response.getTableNames());
+        postgresSource.setUsername(response.getUsername());
+        postgresSource.setSourceName(response.getSourceName());
+
+        return postgresSource;
+    }
+
     private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo streamInfo) {
         KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
         sourceRequest.setSourceName(kafkaSource.getSourceName());
@@ -250,4 +278,28 @@ public class InlongStreamSourceTransfer {
         sourceRequest.setFieldList(InlongStreamTransfer.createStreamFields(source.getFields(), streamInfo));
         return sourceRequest;
     }
+
+    private static PostgresSourceRequest createPostgresSourceRequest(PostgresSource source,
+            InlongStreamInfo streamInfo) {
+        PostgresSourceRequest sourceRequest = new PostgresSourceRequest();
+        sourceRequest.setSourceName(source.getSourceName());
+        if (StringUtils.isEmpty(sourceRequest.getSourceName())) {
+            sourceRequest.setSourceName(streamInfo.getName());
+        }
+        sourceRequest.setDatabase(source.getDbName());
+        sourceRequest.setDecodingPluginName(source.getDecodingPluginName());
+        sourceRequest.setHostname(source.getHostname());
+        sourceRequest.setPassword(source.getPassword());
+        sourceRequest.setPort(source.getPort());
+        sourceRequest.setPrimaryKey(source.getPrimaryKey());
+        sourceRequest.setSchema(source.getSchema());
+        sourceRequest.setTableNameList(source.getTableNameList());
+        sourceRequest.setUsername(source.getUsername());
+
+        sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+        sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+        sourceRequest.setSourceType(source.getSourceType().getType());
+        sourceRequest.setFieldList(InlongStreamTransfer.createStreamFields(source.getFields(), streamInfo));
+        return sourceRequest;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index df7afb60a..e308eb53f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -21,13 +21,14 @@ import java.util.Locale;
 
 public enum SinkType {
 
-    HIVE, KAFKA, ICEBERG, CLICKHOUSE, HBASE;
+    HIVE, KAFKA, ICEBERG, CLICKHOUSE, HBASE, POSTGRES;
 
     public static final String SINK_HIVE = "HIVE";
     public static final String SINK_KAFKA = "KAFKA";
     public static final String SINK_HBASE = "HBASE";
     public static final String SINK_ICEBERG = "ICEBERG";
     public static final String SINK_CLICKHOUSE = "CLICKHOUSE";
+    public static final String SINK_POSTGRES = "POSTGRES";
 
     /**
      * Get the SinkType enum via the given sinkType string
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 524de69e8..848d978a5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -32,7 +32,8 @@ public enum SourceType {
     SQL("SQL", TaskTypeEnum.SQL),
     BINLOG("BINLOG", TaskTypeEnum.BINLOG),
     KAFKA("KAFKA", TaskTypeEnum.KAFKA),
-    PULSAR("PULSAR", TaskTypeEnum.PULSAR);
+    PULSAR("PULSAR", TaskTypeEnum.PULSAR),
+    POSTGRES("POSTGRES", TaskTypeEnum.POSTGRES);
 
     public static final String SOURCE_AUTO_PUSH = "AUTO_PUSH";
     public static final String SOURCE_FILE = "FILE";
@@ -40,6 +41,7 @@ public enum SourceType {
     public static final String SOURCE_BINLOG = "BINLOG";
     public static final String SOURCE_KAFKA = "KAFKA";
     public static final String SOURCE_PULSAR = "PULSAR";
+    public static final String SOURCE_POSTGRES = "POSTGRES";
 
     @Getter
     private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkDTO.java
new file mode 100644
index 000000000..69f1f5261
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkDTO.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.postgres;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+/**
+ * Postgres sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PostgresSinkDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @ApiModelProperty("postgres JDBC URL")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
+    @ApiModelProperty("Properties for postgres")
+    private Map<String, Object> properties;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static PostgresSinkDTO getFromRequest(PostgresSinkRequest request) {
+        return PostgresSinkDTO.builder()
+                .jdbcUrl(request.getJdbcUrl())
+                .username(request.getUsername())
+                .password(request.getPassword())
+                .dbName(request.getDbName())
+                .primaryKey(request.getPrimaryKey())
+                .tableName(request.getTableName())
+                .properties(request.getProperties())
+                .build();
+    }
+
+    /**
+     *  get DTO from json
+     * @param extParams extParams
+     * @return postgres sink DTO
+     */
+    public static PostgresSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, PostgresSinkDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkListResponse.java
new file mode 100644
index 000000000..c67210591
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkListResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+
+/**
+ * Response of Postgres sink list
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of Postgres sink paging list")
+public class PostgresSinkListResponse extends SinkListResponse {
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("postgres jdbc url")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkRequest.java
new file mode 100644
index 000000000..2576622b2
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of the Postgres sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the Postgres sink info")
+@JsonTypeDefine(value = SinkType.SINK_POSTGRES)
+public class PostgresSinkRequest extends SinkRequest {
+
+    @ApiModelProperty("Postgres JDBC URL")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Primary key is required when serializationType is json, avro")
+    private String primaryKey;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkResponse.java
new file mode 100644
index 000000000..707265594
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkResponse.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+
+/**
+ * Response of the postgres sink
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of the postgres sink")
+public class PostgresSinkResponse extends SinkResponse {
+
+    @ApiModelProperty("postgres jdbc url")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("Target database name")
+    private String dbName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Primary key is required when serializationType is json, avro")
+    private String primaryKey;
+
+    public PostgresSinkResponse() {
+        this.sinkType = SinkType.SINK_POSTGRES;
+    }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceDTO.java
new file mode 100644
index 000000000..7d899b0c8
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceDTO.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.postgres;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+/**
+ * Postgres source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PostgresSourceDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
+
+    @ApiModelProperty("Username of the DB server")
+    private String username;
+
+    @ApiModelProperty("Password of the DB server")
+    private String password;
+
+    @ApiModelProperty("Hostname of the DB server")
+    private String hostname;
+
+    @ApiModelProperty("Exposed port of the DB server")
+    private int port;
+
+    @ApiModelProperty("schema")
+    private String schema;
+
+    @ApiModelProperty(value = "database name")
+    private String database;
+
+    @ApiModelProperty(value = "List of tables to be collected")
+    private List<String> tableNameList;
+
+    @ApiModelProperty(value = "Primary key must be shared by all tables", required = false)
+    private String primaryKey;
+
+    @ApiModelProperty(value = "decoding pulgin name")
+    private String decodingPluginName;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static PostgresSourceDTO getFromRequest(PostgresSourceRequest request) {
+        return PostgresSourceDTO.builder()
+                .username(request.getUsername())
+                .password(request.getPassword())
+                .hostname(request.getHostname())
+                .port(request.getPort())
+                .schema(request.getSchema())
+                .database(request.getDatabase())
+                .tableNameList(request.getTableNameList())
+                .primaryKey(request.getPrimaryKey())
+                .decodingPluginName(request.getDecodingPluginName())
+                .build();
+    }
+
+    public static PostgresSourceDTO getFromJson(@NotNull String extParams) {
+        try {
+            OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            return OBJECT_MAPPER.readValue(extParams, PostgresSourceDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceListResponse.java
new file mode 100644
index 000000000..429454a32
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceListResponse.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+
+/**
+ * Response info of postgres source list
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of postgres source paging list")
+public class PostgresSourceListResponse extends SourceListResponse {
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
+    @ApiModelProperty("Username of the DB server")
+    private String username;
+
+    @ApiModelProperty("Password of the DB server")
+    private String password;
+
+    @ApiModelProperty("Hostname of the DB server")
+    private String hostname;
+
+    @ApiModelProperty("Exposed port of the DB server")
+    private int port;
+
+    @ApiModelProperty("Exposed database of the DB")
+    private String database;
+
+    @ApiModelProperty("schema info")
+    private String schema;
+
+    @ApiModelProperty("decoding pulgin name")
+    private String decodingPluginName;
+
+    @ApiModelProperty("List of tables")
+    private List<String> tableNames;
+
+    public PostgresSourceListResponse() {
+        this.setSourceType(SourceType.POSTGRES.getType());
+    }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceRequest.java
new file mode 100644
index 000000000..825b9ad8b
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceRequest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of postgres source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the postgres source info")
+@JsonTypeDefine(value = SourceType.SOURCE_POSTGRES)
+public class PostgresSourceRequest extends SourceRequest {
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
+    @ApiModelProperty("Username of the DB server")
+    private String username;
+
+    @ApiModelProperty("Password of the DB server")
+    private String password;
+
+    @ApiModelProperty("Hostname of the DB server")
+    private String hostname;
+
+    @ApiModelProperty("Exposed port of the DB server")
+    private int port;
+
+    @ApiModelProperty("Exposed database of the DB")
+    private String database;
+
+    @ApiModelProperty("schema info")
+    private String schema;
+
+    @ApiModelProperty("decoding pulgin name")
+    private String decodingPluginName;
+
+    @ApiModelProperty("List of tables")
+    private List<String> tableNameList;
+
+    public PostgresSourceRequest() {
+        this.setSourceType(SourceType.POSTGRES.toString());
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceResponse.java
new file mode 100644
index 000000000..b1d64d010
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+
+/**
+ * Response of the postgres source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of the postgres source")
+public class PostgresSourceResponse extends SourceResponse {
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
+    @ApiModelProperty("Username of the DB server")
+    private String username;
+
+    @ApiModelProperty("Password of the DB server")
+    private String password;
+
+    @ApiModelProperty("Hostname of the DB server")
+    private String hostname;
+
+    @ApiModelProperty("Exposed port of the DB server")
+    private int port;
+
+    @ApiModelProperty("Exposed database of the DB")
+    private String database;
+
+    @ApiModelProperty("schema info")
+    private String schema;
+
+    @ApiModelProperty("decoding pulgin name")
+    private String decodingPluginName;
+
+    @ApiModelProperty("List of tables")
+    private List<String> tableNames;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgres/PostgresSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgres/PostgresSinkOperation.java
new file mode 100644
index 000000000..60c4e7d85
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgres/PostgresSinkOperation.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.postgres;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+import javax.validation.constraints.NotNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
+import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkDTO;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.sink.StreamSinkOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Postgres sink operation
+ */
+@Service
+public class PostgresSinkOperation implements StreamSinkOperation {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSinkOperation.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+    @Autowired
+    private StreamSinkEntityMapper sinkMapper;
+    @Autowired
+    private StreamSinkFieldEntityMapper sinkFieldMapper;
+
+    @Override
+    public Boolean accept(SinkType sinkType) {
+        return SinkType.POSTGRES.equals(sinkType);
+    }
+
+    @Override
+    public Integer saveOpt(SinkRequest request, String operator) {
+        String sinkType = request.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_POSTGRES.equals(sinkType),
+                ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
+
+        PostgresSinkRequest postgresSinkRequest = (PostgresSinkRequest) request;
+        StreamSinkEntity entity = CommonBeanUtils.copyProperties(postgresSinkRequest, StreamSinkEntity::new);
+        entity.setStatus(SinkStatus.NEW.getCode());
+        entity.setIsDeleted(GlobalConstants.UN_DELETED);
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+        Date now = new Date();
+        entity.setCreateTime(now);
+        entity.setModifyTime(now);
+
+        // get the ext params
+        PostgresSinkDTO dto = PostgresSinkDTO.getFromRequest(postgresSinkRequest);
+        try {
+            entity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED);
+        }
+        sinkMapper.insert(entity);
+        Integer sinkId = entity.getId();
+        request.setId(sinkId);
+        this.saveFieldOpt(request);
+        return sinkId;
+    }
+
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
+        List<SinkFieldRequest> fieldList = request.getFieldList();
+        LOGGER.info("begin to save field={}", fieldList);
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return;
+        }
+
+        int size = fieldList.size();
+        List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sinkType = request.getSinkType();
+        Integer sinkId = request.getId();
+        for (SinkFieldRequest fieldInfo : fieldList) {
+            StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setSinkType(sinkType);
+            fieldEntity.setSinkId(sinkId);
+            fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+            entityList.add(fieldEntity);
+        }
+
+        sinkFieldMapper.insertAll(entityList);
+        LOGGER.info("success to save field");
+    }
+
+    @Override
+    public SinkResponse getByEntity(@NotNull StreamSinkEntity entity) {
+        Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+        String existType = entity.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_POSTGRES.equals(existType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_POSTGRES, existType));
+        SinkResponse response = this.getFromEntity(entity, PostgresSinkResponse::new);
+        List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(entity.getId());
+        List<SinkFieldResponse> infos = CommonBeanUtils.copyListProperties(entities, SinkFieldResponse::new);
+        response.setFieldList(infos);
+        return response;
+    }
+
+    @Override
+    public <T> T getFromEntity(StreamSinkEntity entity, Supplier<T> target) {
+        T result = target.get();
+        if (entity == null) {
+            return result;
+        }
+        String existType = entity.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_POSTGRES.equals(existType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_POSTGRES, existType));
+
+        PostgresSinkDTO dto = PostgresSinkDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, result, true);
+        CommonBeanUtils.copyProperties(dto, result, true);
+
+        return result;
+    }
+
+    @Override
+    public PageInfo<? extends SinkListResponse> getPageInfo(Page<StreamSinkEntity> entityPage) {
+        if (CollectionUtils.isEmpty(entityPage)) {
+            return new PageInfo<>();
+        }
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, PostgresSinkListResponse::new));
+    }
+
+    @Override
+    public void updateOpt(SinkRequest request, String operator) {
+        String sinkType = request.getSinkType();
+        Preconditions.checkTrue(SinkType.SINK_POSTGRES.equals(sinkType),
+                String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_POSTGRES, sinkType));
+
+        StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
+        Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+        PostgresSinkRequest postgresSinkRequest = (PostgresSinkRequest) request;
+        CommonBeanUtils.copyProperties(postgresSinkRequest, entity, true);
+        try {
+            PostgresSinkDTO dto = PostgresSinkDTO.getFromRequest(postgresSinkRequest);
+            entity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+        }
+
+        entity.setPreviousStatus(entity.getStatus());
+        entity.setStatus(SinkStatus.CONFIG_ING.getCode());
+        entity.setModifier(operator);
+        entity.setModifyTime(new Date());
+        sinkMapper.updateByPrimaryKeySelective(entity);
+
+        boolean onlyAdd = SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
+        this.updateFieldOpt(onlyAdd, postgresSinkRequest);
+
+        LOGGER.info("success to update sink of type={}", sinkType);
+    }
+
+    @Override
+    public void updateFieldOpt(Boolean onlyAdd, SinkRequest request) {
+        Integer sinkId = request.getId();
+        List<SinkFieldRequest> fieldRequestList = request.getFieldList();
+        if (CollectionUtils.isEmpty(fieldRequestList)) {
+            return;
+        }
+        if (onlyAdd) {
+            List<StreamSinkFieldEntity> existsFieldList = sinkFieldMapper.selectBySinkId(sinkId);
+            if (existsFieldList.size() > fieldRequestList.size()) {
+                throw new BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
+            }
+            for (int i = 0; i < existsFieldList.size(); i++) {
+                if (!existsFieldList.get(i).getFieldName().equals(fieldRequestList.get(i).getFieldName())) {
+                    throw new BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
+                }
+            }
+        }
+        // First physically delete the existing fields
+        sinkFieldMapper.deleteAll(sinkId);
+        // Then batch save the sink fields
+        this.saveFieldOpt(request);
+        LOGGER.info("success to update field");
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 62c917500..75f8998a0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -30,6 +30,7 @@ import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaOffset;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
@@ -37,6 +38,7 @@ import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
 import org.apache.inlong.sort.protocol.node.format.AvroFormat;
 import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
@@ -71,6 +73,8 @@ public class ExtractNodeUtils {
                 return createExtractNode((KafkaSourceResponse) sourceResponse);
             case PULSAR:
                 return createExtractNode((PulsarSourceResponse) sourceResponse);
+            case POSTGRES:
+                return createExtractNode((PostgresSourceResponse) sourceResponse);
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported sourceType=%s to create extractNode", sourceType));
@@ -249,4 +253,25 @@ public class ExtractNodeUtils {
                 startupMode.getValue(),
                 primaryKey);
     }
+
+    /**
+     * Create PostgresExtractNode based PostgresSourceResponse
+     *
+     * @param postgresSourceResponse postgres source response
+     * @return postgres extract node info
+     */
+    public static PostgresExtractNode createExtractNode(PostgresSourceResponse postgresSourceResponse) {
+        List<InlongStreamFieldInfo> streamFieldInfos = postgresSourceResponse.getFieldList();
+        String id = postgresSourceResponse.getSourceName();
+        String name = postgresSourceResponse.getSourceName();
+        List<FieldInfo> fields = streamFieldInfos.stream()
+                .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
+                .collect(Collectors.toList());
+        return new PostgresExtractNode(id, name, fields, null, null,
+                postgresSourceResponse.getPrimaryKey(), postgresSourceResponse.getTableNames(),
+                postgresSourceResponse.getHostname(), postgresSourceResponse.getUsername(),
+                postgresSourceResponse.getPassword(), postgresSourceResponse.getDatabase(),
+                postgresSourceResponse.getSchema(), postgresSourceResponse.getPort(),
+                postgresSourceResponse.getDecodingPluginName());
+    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 2cd25a033..504f4fc73 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hbase.HbaseSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkResponse;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.node.LoadNode;
 import org.apache.inlong.sort.protocol.node.format.AvroFormat;
@@ -38,6 +39,7 @@ import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
 import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
 import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
 
 import java.util.List;
@@ -72,6 +74,8 @@ public class LoadNodeUtils {
                 return createLoadNode((HiveSinkResponse) sinkResponse);
             case HBASE:
                 return createLoadNode((HbaseSinkResponse) sinkResponse);
+            case POSTGRES:
+                return createLoadNode((PostgresSinkResponse) sinkResponse);
             default:
                 throw new IllegalArgumentException(
                         String.format("Unsupported sinkType=%s to create loadNode", sinkType));
@@ -222,6 +226,29 @@ public class LoadNodeUtils {
     }
 
     /**
+     * create postgres load node
+     * @param postgresSinkResponse postgresSinkResponse
+     * @return postgres load node
+     */
+    public static PostgresLoadNode createLoadNode(PostgresSinkResponse postgresSinkResponse) {
+        List<SinkFieldResponse> sinkFieldResponses = postgresSinkResponse.getFieldList();
+
+        String name = postgresSinkResponse.getSinkName();
+        List<FieldInfo> fields = sinkFieldResponses.stream()
+                .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse,
+                        name))
+                .collect(Collectors.toList());
+        List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
+        return new PostgresLoadNode(postgresSinkResponse.getSinkName(),
+                postgresSinkResponse.getSinkName(),
+                fields, fieldRelationShips, null, null, 1,
+                null, postgresSinkResponse.getJdbcUrl(), postgresSinkResponse.getUsername(),
+                postgresSinkResponse.getPassword(),
+                postgresSinkResponse.getDbName() + "." + postgresSinkResponse.getTableName(),
+                postgresSinkResponse.getPrimaryKey());
+    }
+    
+    /**f
      * Parse information field of data sink.
      */
     public static List<FieldRelationShip> parseSinkFields(List<SinkFieldResponse> sinkFieldResponses, String sinkName) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgres/PostgresSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgres/PostgresSourceOperation.java
new file mode 100644
index 000000000..6f813f9a4
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgres/PostgresSourceOperation.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.postgres;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import java.util.function.Supplier;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * postgres stream source operation.
+ */
+@Service
+public class PostgresSourceOperation extends AbstractSourceOperation {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(SourceType sourceType) {
+        return SourceType.POSTGRES == sourceType;
+    }
+
+    @Override
+    protected String getSourceType() {
+        return SourceType.POSTGRES.getType();
+    }
+
+    @Override
+    protected SourceResponse getResponse() {
+        return new PostgresSourceResponse();
+    }
+
+    @Override
+    public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
+        if (CollectionUtils.isEmpty(entityPage)) {
+            return new PageInfo<>();
+        }
+        return entityPage.toPageInfo(entity -> this.getFromEntity(entity, PostgresSourceListResponse::new));
+    }
+
+    @Override
+    protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+        PostgresSourceRequest sourceRequest = (PostgresSourceRequest) request;
+        CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+        try {
+            PostgresSourceDTO dto = PostgresSourceDTO.getFromRequest(sourceRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+        }
+    }
+
+    @Override
+    public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
+        T result = target.get();
+        if (entity == null) {
+            return result;
+        }
+        String existType = entity.getSourceType();
+        Preconditions.checkTrue(getSourceType().equals(existType),
+                String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
+        PostgresSourceDTO dto = PostgresSourceDTO.getFromJson(entity.getExtParams());
+        CommonBeanUtils.copyProperties(entity, result, true);
+        CommonBeanUtils.copyProperties(dto, result, true);
+        return result;
+    }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
new file mode 100644
index 000000000..f5eda55fe
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.sink;
+
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Stream sink service test
+ */
+public class PostgresStreamSinkServiceTest extends ServiceBaseTest {
+
+    private static final String globalGroupId = "b_group1";
+    private static final String globalStreamId = "stream1";
+    private static final String globalOperator = "admin";
+
+    @Autowired
+    private StreamSinkService sinkService;
+    @Autowired
+    private InlongStreamServiceTest streamServiceTest;
+
+    /**
+     * Save sink info.
+     */
+    public Integer saveSink(String sinkName) {
+        streamServiceTest.saveInlongStream(globalGroupId, globalStreamId,
+                globalOperator);
+        PostgresSinkRequest sinkInfo = new PostgresSinkRequest();
+        sinkInfo.setInlongGroupId(globalGroupId);
+        sinkInfo.setInlongStreamId(globalStreamId);
+        sinkInfo.setSinkType(SinkType.SINK_POSTGRES);
+
+        sinkInfo.setJdbcUrl("jdbc:postgresql://localhost:5432/postgres");
+        sinkInfo.setUsername("postgres");
+        sinkInfo.setPassword("inlong");
+        sinkInfo.setDbName("public");
+        sinkInfo.setTableName("user");
+        sinkInfo.setPrimaryKey("name,age");
+
+        sinkInfo.setSinkName(sinkName);
+        sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+        return sinkService.save(sinkInfo, globalOperator);
+    }
+
+    /**
+     * Delete postgres sink info by sink id.
+     */
+    public void deletePostgresSink(Integer postgresSinkId) {
+        boolean result = sinkService.delete(postgresSinkId, globalOperator);
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        Integer postgresSinkId = this.saveSink("postgres_default1");
+        SinkResponse sink = sinkService.get(postgresSinkId);
+        Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
+        deletePostgresSink(postgresSinkId);
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        Integer postgresSinkId = this.saveSink("postgres_default2");
+        SinkResponse response = sinkService.get(postgresSinkId);
+        Assert.assertEquals(globalGroupId, response.getInlongGroupId());
+
+        PostgresSinkResponse postgresSinkResponse = (PostgresSinkResponse) response;
+        postgresSinkResponse.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+
+        PostgresSinkRequest request = CommonBeanUtils.copyProperties(postgresSinkResponse,
+                PostgresSinkRequest::new);
+        boolean result = sinkService.update(request, globalOperator);
+        Assert.assertTrue(result);
+        deletePostgresSink(postgresSinkId);
+    }
+
+}