You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/23 06:59:51 UTC
[incubator-inlong] branch master updated: [INLONG-4240][Manager] Support Postgres source and sink (#4241)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7e8ef88b7 [INLONG-4240][Manager] Support Postgres source and sink (#4241)
7e8ef88b7 is described below
commit 7e8ef88b7571af5fc1826981172634ae32b31e21
Author: baomingyu <ba...@163.com>
AuthorDate: Mon May 23 01:59:46 2022 -0500
[INLONG-4240][Manager] Support Postgres source and sink (#4241)
---
.../apache/inlong/common/enums/TaskTypeEnum.java | 4 +-
.../manager/client/api/sink/PostgresSink.java | 71 +++++++
.../manager/client/api/source/PostgresSource.java | 76 +++++++
.../manager/client/api/util/InlongParser.java | 17 ++
.../client/api/util/InlongStreamSinkTransfer.java | 76 +++++++
.../api/util/InlongStreamSourceTransfer.java | 52 +++++
.../inlong/manager/common/enums/SinkType.java | 3 +-
.../inlong/manager/common/enums/SourceType.java | 4 +-
.../common/pojo/sink/postgres/PostgresSinkDTO.java | 93 +++++++++
.../sink/postgres/PostgresSinkListResponse.java | 51 +++++
.../pojo/sink/postgres/PostgresSinkRequest.java | 56 +++++
.../pojo/sink/postgres/PostgresSinkResponse.java | 58 ++++++
.../pojo/source/postgres/PostgresSourceDTO.java | 96 +++++++++
.../postgres/PostgresSourceListResponse.java | 66 ++++++
.../source/postgres/PostgresSourceRequest.java | 71 +++++++
.../source/postgres/PostgresSourceResponse.java | 63 ++++++
.../sink/postgres/PostgresSinkOperation.java | 229 +++++++++++++++++++++
.../service/sort/util/ExtractNodeUtils.java | 25 +++
.../manager/service/sort/util/LoadNodeUtils.java | 27 +++
.../source/postgres/PostgresSourceOperation.java | 100 +++++++++
.../core/sink/PostgresStreamSinkServiceTest.java | 102 +++++++++
21 files changed, 1337 insertions(+), 3 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index 6d12377d8..6cb11d120 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -20,7 +20,7 @@ package org.apache.inlong.common.enums;
import static java.util.Objects.requireNonNull;
public enum TaskTypeEnum {
- DATABASE_MIGRATION(0),SQL(1), BINLOG(2), FILE(3), KAFKA(4), PULSAR(5);
+ DATABASE_MIGRATION(0),SQL(1), BINLOG(2), FILE(3), KAFKA(4), PULSAR(5), POSTGRES(6);
private int type;
@@ -43,6 +43,8 @@ public enum TaskTypeEnum {
return KAFKA;
case 5:
return PULSAR;
+ case 6:
+ return POSTGRES;
default:
throw new RuntimeException("such task type doesn't exist");
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/PostgresSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/PostgresSink.java
new file mode 100644
index 000000000..90e1b8574
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/PostgresSink.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.sink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.enums.DataFormat;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.stream.SinkField;
+import org.apache.inlong.manager.common.pojo.stream.StreamSink;
+
+/**
+ * Postgres sink.
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Postgres sink configuration")
+public class PostgresSink extends StreamSink {
+
+ @ApiModelProperty(value = "Sink type", required = true)
+ private SinkType sinkType = SinkType.POSTGRES;
+
+ @ApiModelProperty("Postgres JDBC URL")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Target database name")
+ private String dbName;
+
+ @ApiModelProperty("Authentication for postgres")
+ private DefaultAuthentication authentication;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Field definitions for postgres")
+ private List<SinkField> sinkFields;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ /**
+ * get data format
+ * @return just NONE
+ */
+ public DataFormat getDataFormat() {
+ return DataFormat.NONE;
+ }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java
new file mode 100644
index 000000000..c55b0cb65
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/PostgresSource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.source;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.DataFormat;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
+
+/**
+ * Postgres source.
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base configuration for Postgres collection")
+public class PostgresSource extends StreamSource {
+
+ @ApiModelProperty(value = "DataSource type", required = true)
+ private SourceType sourceType = SourceType.POSTGRES;
+
+ @ApiModelProperty("SyncType")
+ private SyncType syncType = SyncType.INCREMENT;
+
+ @ApiModelProperty("Data format type")
+ private DataFormat dataFormat = DataFormat.NONE;
+
+ @ApiModelProperty("Db server username")
+ private String username;
+
+ @ApiModelProperty("Db password")
+ private String password;
+
+ @ApiModelProperty("DB Server hostname")
+ private String hostname;
+
+ @ApiModelProperty("DB Server port")
+ private int port;
+
+ @ApiModelProperty("Database name")
+ private String dbName;
+
+ @ApiModelProperty("schema info")
+ private String schema;
+
+ @ApiModelProperty("Data table name list")
+ private List<String> tableNameList;
+
+ @ApiModelProperty("decoding pulgin name")
+ private String decodingPluginName;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 2b6e8742f..0b8eba701 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -41,6 +41,7 @@ import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.autopush.AutoPushSourceListResponse;
@@ -52,6 +53,8 @@ import org.apache.inlong.manager.common.pojo.source.file.FileSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
@@ -169,6 +172,11 @@ public class InlongParser {
AutoPushSourceRequest.class);
sourceResponses.add(autoPushSourceResponse);
break;
+ case POSTGRES:
+ PostgresSourceResponse postgresSourceResponse = GsonUtil.fromJson(sourceJson.toString(),
+ PostgresSourceResponse.class);
+ sourceResponses.add(postgresSourceResponse);
+ break;
default:
throw new RuntimeException(String.format("Unsupported sourceType=%s for Inlong", sourceType));
}
@@ -203,6 +211,11 @@ public class InlongParser {
ClickHouseSinkResponse.class);
sinkResponses.add(clickHouseSinkResponse);
break;
+ case POSTGRES:
+ PostgresSinkResponse postgresSinkResponse = GsonUtil.fromJson(sinkJson.toString(),
+ PostgresSinkResponse.class);
+ sinkResponses.add(postgresSinkResponse);
+ break;
default:
throw new RuntimeException(String.format("Unsupported sinkType=%s for Inlong", sinkType));
}
@@ -240,6 +253,10 @@ public class InlongParser {
return GsonUtil.fromJson(pageInfoJson,
new TypeToken<PageInfo<AutoPushSourceListResponse>>() {
}.getType());
+ case POSTGRES:
+ return GsonUtil.fromJson(pageInfoJson,
+ new TypeToken<PageInfo<PostgresSourceListResponse>>() {
+ }.getType());
default:
throw new IllegalArgumentException(
String.format("Unsupported sourceType=%s for Inlong", sourceType));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 1754dd970..21b543d50 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.client.api.sink.HbaseSink;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.sink.KafkaSink;
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.client.api.sink.PostgresSink;
import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.FileFormat;
@@ -43,6 +44,8 @@ import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkRequest;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.SinkField;
import org.apache.inlong.manager.common.pojo.stream.StreamSink;
@@ -69,6 +72,8 @@ public class InlongStreamSinkTransfer {
return createClickHouseRequest(streamSink, streamInfo);
case HBASE:
return createHbaseRequest(streamSink, streamInfo);
+ case POSTGRES:
+ return createPostgresRequest(streamSink, streamInfo);
default:
throw new IllegalArgumentException(String.format("Unsupported sink type : %s for Inlong", sinkType));
}
@@ -93,6 +98,8 @@ public class InlongStreamSinkTransfer {
return parseClickHouseSink((ClickHouseSinkResponse) sinkResponse, streamSink);
case HBASE:
return parseHbaseSink((HbaseSinkResponse) sinkResponse, streamSink);
+ case POSTGRES:
+ return parsePostgresSink((PostgresSinkResponse) sinkResponse, streamSink);
default:
throw new IllegalArgumentException(String.format("Unsupported sink type : %s for Inlong", sinkType));
}
@@ -432,4 +439,73 @@ public class InlongStreamSinkTransfer {
return hiveSink;
}
+ /**
+ * Create postgres request
+ *
+ * @param streamSink streamSink
+ * @param streamInfo streamInfo
+ * @return postgres sinkRequest
+ */
+ private static SinkRequest createPostgresRequest(StreamSink streamSink, InlongStreamInfo streamInfo) {
+ PostgresSinkRequest postgresSinkRequest = new PostgresSinkRequest();
+ PostgresSink postgresSink = (PostgresSink) streamSink;
+ postgresSinkRequest.setJdbcUrl(postgresSink.getJdbcUrl());
+ postgresSinkRequest.setUsername(postgresSink.getAuthentication().getUserName());
+ postgresSinkRequest.setPassword(postgresSink.getAuthentication().getPassword());
+ postgresSinkRequest.setDbName(postgresSink.getDbName());
+ postgresSinkRequest.setTableName(postgresSink.getTableName());
+
+ postgresSinkRequest.setSinkName(postgresSink.getSinkName());
+ postgresSinkRequest.setSinkType(postgresSink.getSinkType().name());
+ postgresSinkRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+ postgresSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+
+ postgresSinkRequest.setProperties(postgresSink.getProperties());
+ postgresSinkRequest.setPrimaryKey(postgresSink.getPrimaryKey());
+
+ if (CollectionUtils.isNotEmpty(postgresSink.getSinkFields())) {
+ List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(postgresSink.getSinkFields());
+ postgresSinkRequest.setFieldList(fieldRequests);
+ }
+ return postgresSinkRequest;
+ }
+
+ /**
+ * parse postgres sink
+ *
+ * @param sinkResponse sinkResponse
+ * @param sink sink
+ * @return postgres streamSink
+ */
+ private static StreamSink parsePostgresSink(PostgresSinkResponse sinkResponse, StreamSink sink) {
+ PostgresSink postgresSink = new PostgresSink();
+ if (sink != null) {
+ AssertUtils.isTrue(sinkResponse.getSinkName().equals(sink.getSinkName()),
+ String.format("SinkName is not equal: %s != %s", sinkResponse, sink));
+ PostgresSink snapshot = (PostgresSink) sink;
+
+ postgresSink.setSinkName(snapshot.getSinkName());
+ postgresSink.setAuthentication(snapshot.getAuthentication());
+
+ postgresSink.setJdbcUrl(snapshot.getJdbcUrl());
+ postgresSink.setTableName(snapshot.getTableName());
+ postgresSink.setDbName(snapshot.getDbName());
+ } else {
+ postgresSink.setSinkName(sinkResponse.getSinkName());
+ String password = sinkResponse.getPassword();
+ String uname = sinkResponse.getUsername();
+ postgresSink.setAuthentication(new DefaultAuthentication(uname, password));
+
+ postgresSink.setJdbcUrl(sinkResponse.getJdbcUrl());
+ postgresSink.setTableName(sinkResponse.getTableName());
+ postgresSink.setDbName(sinkResponse.getDbName());
+ }
+ postgresSink.setPrimaryKey(sinkResponse.getPrimaryKey());
+ postgresSink.setProperties(sinkResponse.getProperties());
+ postgresSink.setSinkType(SinkType.POSTGRES);
+ if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
+ postgresSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
+ }
+ return postgresSink;
+ }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 62cf20067..c051d671b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.client.api.source.AgentFileSource;
import org.apache.inlong.manager.client.api.source.AutoPushSource;
import org.apache.inlong.manager.client.api.source.KafkaSource;
import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.client.api.source.PostgresSource;
import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
@@ -37,6 +38,8 @@ import org.apache.inlong.manager.common.pojo.source.file.FileSourceRequest;
import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.StreamSource.State;
@@ -60,6 +63,8 @@ public class InlongStreamSourceTransfer {
return createFileSourceRequest((AgentFileSource) streamSource, streamInfo);
case AUTO_PUSH:
return createAutoPushSourceRequest((AutoPushSource) streamSource, streamInfo);
+ case POSTGRES:
+ return createPostgresSourceRequest((PostgresSource) streamSource, streamInfo);
default:
throw new RuntimeException(String.format("Unsupported source=%s for Inlong", sourceType));
}
@@ -80,6 +85,9 @@ public class InlongStreamSourceTransfer {
if (sourceType == SourceType.AUTO_PUSH && sourceResponse instanceof AutoPushSourceResponse) {
return parseAutoPushSource((AutoPushSourceResponse) sourceResponse);
}
+ if (sourceType == SourceType.POSTGRES && sourceResponse instanceof AutoPushSourceResponse) {
+ return parsePostgresSource((PostgresSourceResponse) sourceResponse);
+ }
throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
}
@@ -158,6 +166,26 @@ public class InlongStreamSourceTransfer {
return autoPushSource;
}
+ private static PostgresSource parsePostgresSource(PostgresSourceResponse response) {
+ PostgresSource postgresSource = new PostgresSource();
+ postgresSource.setSourceName(response.getSourceName());
+ postgresSource.setState(State.parseByStatus(response.getStatus()));
+ postgresSource.setDataFormat(DataFormat.NONE);
+ postgresSource.setFields(InlongStreamTransfer.parseStreamFields(response.getFieldList()));
+
+ postgresSource.setDbName(response.getDatabase());
+ postgresSource.setDecodingPluginName(response.getDecodingPluginName());
+ postgresSource.setHostname(response.getHostname());
+ postgresSource.setPassword(response.getPassword());
+ postgresSource.setPort(response.getPort());
+ postgresSource.setSchema(response.getSchema());
+ postgresSource.setTableNameList(response.getTableNames());
+ postgresSource.setUsername(response.getUsername());
+ postgresSource.setSourceName(response.getSourceName());
+
+ return postgresSource;
+ }
+
private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo streamInfo) {
KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
sourceRequest.setSourceName(kafkaSource.getSourceName());
@@ -250,4 +278,28 @@ public class InlongStreamSourceTransfer {
sourceRequest.setFieldList(InlongStreamTransfer.createStreamFields(source.getFields(), streamInfo));
return sourceRequest;
}
+
+ private static PostgresSourceRequest createPostgresSourceRequest(PostgresSource source,
+ InlongStreamInfo streamInfo) {
+ PostgresSourceRequest sourceRequest = new PostgresSourceRequest();
+ sourceRequest.setSourceName(source.getSourceName());
+ if (StringUtils.isEmpty(sourceRequest.getSourceName())) {
+ sourceRequest.setSourceName(streamInfo.getName());
+ }
+ sourceRequest.setDatabase(source.getDbName());
+ sourceRequest.setDecodingPluginName(source.getDecodingPluginName());
+ sourceRequest.setHostname(source.getHostname());
+ sourceRequest.setPassword(source.getPassword());
+ sourceRequest.setPort(source.getPort());
+ sourceRequest.setPrimaryKey(source.getPrimaryKey());
+ sourceRequest.setSchema(source.getSchema());
+ sourceRequest.setTableNameList(source.getTableNameList());
+ sourceRequest.setUsername(source.getUsername());
+
+ sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+ sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+ sourceRequest.setSourceType(source.getSourceType().getType());
+ sourceRequest.setFieldList(InlongStreamTransfer.createStreamFields(source.getFields(), streamInfo));
+ return sourceRequest;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index df7afb60a..e308eb53f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -21,13 +21,14 @@ import java.util.Locale;
public enum SinkType {
- HIVE, KAFKA, ICEBERG, CLICKHOUSE, HBASE;
+ HIVE, KAFKA, ICEBERG, CLICKHOUSE, HBASE, POSTGRES;
public static final String SINK_HIVE = "HIVE";
public static final String SINK_KAFKA = "KAFKA";
public static final String SINK_HBASE = "HBASE";
public static final String SINK_ICEBERG = "ICEBERG";
public static final String SINK_CLICKHOUSE = "CLICKHOUSE";
+ public static final String SINK_POSTGRES = "POSTGRES";
/**
* Get the SinkType enum via the given sinkType string
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 524de69e8..848d978a5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -32,7 +32,8 @@ public enum SourceType {
SQL("SQL", TaskTypeEnum.SQL),
BINLOG("BINLOG", TaskTypeEnum.BINLOG),
KAFKA("KAFKA", TaskTypeEnum.KAFKA),
- PULSAR("PULSAR", TaskTypeEnum.PULSAR);
+ PULSAR("PULSAR", TaskTypeEnum.PULSAR),
+ POSTGRES("POSTGRES", TaskTypeEnum.POSTGRES);
public static final String SOURCE_AUTO_PUSH = "AUTO_PUSH";
public static final String SOURCE_FILE = "FILE";
@@ -40,6 +41,7 @@ public enum SourceType {
public static final String SOURCE_BINLOG = "BINLOG";
public static final String SOURCE_KAFKA = "KAFKA";
public static final String SOURCE_PULSAR = "PULSAR";
+ public static final String SOURCE_POSTGRES = "POSTGRES";
@Getter
private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkDTO.java
new file mode 100644
index 000000000..69f1f5261
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkDTO.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.postgres;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+/**
+ * Postgres sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PostgresSinkDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ @ApiModelProperty("postgres JDBC URL")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Target database name")
+ private String dbName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ @ApiModelProperty("Properties for postgres")
+ private Map<String, Object> properties;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static PostgresSinkDTO getFromRequest(PostgresSinkRequest request) {
+ return PostgresSinkDTO.builder()
+ .jdbcUrl(request.getJdbcUrl())
+ .username(request.getUsername())
+ .password(request.getPassword())
+ .dbName(request.getDbName())
+ .primaryKey(request.getPrimaryKey())
+ .tableName(request.getTableName())
+ .properties(request.getProperties())
+ .build();
+ }
+
+ /**
+ * get DTO from json
+ * @param extParams extParams
+ * @return postgres sink DTO
+ */
+ public static PostgresSinkDTO getFromJson(@NotNull String extParams) {
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, PostgresSinkDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkListResponse.java
new file mode 100644
index 000000000..c67210591
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkListResponse.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+
+/**
+ * Response of Postgres sink list
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of Postgres sink paging list")
+public class PostgresSinkListResponse extends SinkListResponse {
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("postgres jdbc url")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Target database name")
+ private String dbName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkRequest.java
new file mode 100644
index 000000000..2576622b2
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkRequest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of the Postgres sink info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the Postgres sink info")
+@JsonTypeDefine(value = SinkType.SINK_POSTGRES)
+public class PostgresSinkRequest extends SinkRequest {
+
+ @ApiModelProperty("Postgres JDBC URL")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Target database name")
+ private String dbName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key is required when serializationType is json, avro")
+ private String primaryKey;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkResponse.java
new file mode 100644
index 000000000..707265594
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkResponse.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+
+/**
+ * Response of the postgres sink
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of the postgres sink")
+public class PostgresSinkResponse extends SinkResponse {
+
+ @ApiModelProperty("postgres jdbc url")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Username for JDBC URL")
+ private String username;
+
+ @ApiModelProperty("User password")
+ private String password;
+
+ @ApiModelProperty("Target database name")
+ private String dbName;
+
+ @ApiModelProperty("Target table name")
+ private String tableName;
+
+ @ApiModelProperty("Primary key is required when serializationType is json, avro")
+ private String primaryKey;
+
+ public PostgresSinkResponse() {
+ this.sinkType = SinkType.SINK_POSTGRES;
+ }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceDTO.java
new file mode 100644
index 000000000..7d899b0c8
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceDTO.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.postgres;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import javax.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+/**
+ * Postgres source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class PostgresSourceDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
+
+ @ApiModelProperty("Username of the DB server")
+ private String username;
+
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
+ @ApiModelProperty("Hostname of the DB server")
+ private String hostname;
+
+ @ApiModelProperty("Exposed port of the DB server")
+ private int port;
+
+ @ApiModelProperty("schema")
+ private String schema;
+
+ @ApiModelProperty(value = "database name")
+ private String database;
+
+ @ApiModelProperty(value = "List of tables to be collected")
+ private List<String> tableNameList;
+
+ @ApiModelProperty(value = "Primary key must be shared by all tables", required = false)
+ private String primaryKey;
+
+ @ApiModelProperty(value = "decoding pulgin name")
+ private String decodingPluginName;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static PostgresSourceDTO getFromRequest(PostgresSourceRequest request) {
+ return PostgresSourceDTO.builder()
+ .username(request.getUsername())
+ .password(request.getPassword())
+ .hostname(request.getHostname())
+ .port(request.getPort())
+ .schema(request.getSchema())
+ .database(request.getDatabase())
+ .tableNameList(request.getTableNameList())
+ .primaryKey(request.getPrimaryKey())
+ .decodingPluginName(request.getDecodingPluginName())
+ .build();
+ }
+
+ public static PostgresSourceDTO getFromJson(@NotNull String extParams) {
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, PostgresSourceDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceListResponse.java
new file mode 100644
index 000000000..429454a32
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceListResponse.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+
+/**
+ * Response info of postgres source list
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Response of postgres source paging list")
+public class PostgresSourceListResponse extends SourceListResponse {
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ @ApiModelProperty("Username of the DB server")
+ private String username;
+
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
+ @ApiModelProperty("Hostname of the DB server")
+ private String hostname;
+
+ @ApiModelProperty("Exposed port of the DB server")
+ private int port;
+
+ @ApiModelProperty("Exposed database of the DB")
+ private String database;
+
+ @ApiModelProperty("schema info")
+ private String schema;
+
+ @ApiModelProperty("decoding pulgin name")
+ private String decodingPluginName;
+
+ @ApiModelProperty("List of tables")
+ private List<String> tableNames;
+
+ public PostgresSourceListResponse() {
+ this.setSourceType(SourceType.POSTGRES.getType());
+ }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceRequest.java
new file mode 100644
index 000000000..825b9ad8b
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceRequest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Request of postgres source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Request of the postgres source info")
+@JsonTypeDefine(value = SourceType.SOURCE_POSTGRES)
+public class PostgresSourceRequest extends SourceRequest {
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ @ApiModelProperty("Username of the DB server")
+ private String username;
+
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
+ @ApiModelProperty("Hostname of the DB server")
+ private String hostname;
+
+ @ApiModelProperty("Exposed port of the DB server")
+ private int port;
+
+ @ApiModelProperty("Exposed database of the DB")
+ private String database;
+
+ @ApiModelProperty("schema info")
+ private String schema;
+
+ @ApiModelProperty("decoding pulgin name")
+ private String decodingPluginName;
+
+ @ApiModelProperty("List of tables")
+ private List<String> tableNameList;
+
+ public PostgresSourceRequest() {
+ this.setSourceType(SourceType.POSTGRES.toString());
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceResponse.java
new file mode 100644
index 000000000..b1d64d010
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/postgres/PostgresSourceResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.source.postgres;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+
+/**
+ * Response of the postgres source
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Response of the postgres source")
+public class PostgresSourceResponse extends SourceResponse {
+
+ @ApiModelProperty("Primary key")
+ private String primaryKey;
+
+ @ApiModelProperty("Username of the DB server")
+ private String username;
+
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
+ @ApiModelProperty("Hostname of the DB server")
+ private String hostname;
+
+ @ApiModelProperty("Exposed port of the DB server")
+ private int port;
+
+ @ApiModelProperty("Exposed database of the DB")
+ private String database;
+
+ @ApiModelProperty("schema info")
+ private String schema;
+
+ @ApiModelProperty("decoding pulgin name")
+ private String decodingPluginName;
+
+ @ApiModelProperty("List of tables")
+ private List<String> tableNames;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgres/PostgresSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgres/PostgresSinkOperation.java
new file mode 100644
index 000000000..60c4e7d85
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/postgres/PostgresSinkOperation.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.postgres;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+import javax.validation.constraints.NotNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
+import org.apache.inlong.manager.common.pojo.sink.SinkFieldResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkDTO;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.sink.StreamSinkOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Postgres sink operation
+ */
+@Service
+public class PostgresSinkOperation implements StreamSinkOperation {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSinkOperation.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+ @Autowired
+ private StreamSinkEntityMapper sinkMapper;
+ @Autowired
+ private StreamSinkFieldEntityMapper sinkFieldMapper;
+
+ @Override
+ public Boolean accept(SinkType sinkType) {
+ return SinkType.POSTGRES.equals(sinkType);
+ }
+
+ @Override
+ public Integer saveOpt(SinkRequest request, String operator) {
+ String sinkType = request.getSinkType();
+ Preconditions.checkTrue(SinkType.SINK_POSTGRES.equals(sinkType),
+ ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + sinkType);
+
+ PostgresSinkRequest postgresSinkRequest = (PostgresSinkRequest) request;
+ StreamSinkEntity entity = CommonBeanUtils.copyProperties(postgresSinkRequest, StreamSinkEntity::new);
+ entity.setStatus(SinkStatus.NEW.getCode());
+ entity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entity.setCreator(operator);
+ entity.setModifier(operator);
+ Date now = new Date();
+ entity.setCreateTime(now);
+ entity.setModifyTime(now);
+
+ // get the ext params
+ PostgresSinkDTO dto = PostgresSinkDTO.getFromRequest(postgresSinkRequest);
+ try {
+ entity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED);
+ }
+ sinkMapper.insert(entity);
+ Integer sinkId = entity.getId();
+ request.setId(sinkId);
+ this.saveFieldOpt(request);
+ return sinkId;
+ }
+
+ @Override
+ public void saveFieldOpt(SinkRequest request) {
+ List<SinkFieldRequest> fieldList = request.getFieldList();
+ LOGGER.info("begin to save field={}", fieldList);
+ if (CollectionUtils.isEmpty(fieldList)) {
+ return;
+ }
+
+ int size = fieldList.size();
+ List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+ String groupId = request.getInlongGroupId();
+ String streamId = request.getInlongStreamId();
+ String sinkType = request.getSinkType();
+ Integer sinkId = request.getId();
+ for (SinkFieldRequest fieldInfo : fieldList) {
+ StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+ if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+ fieldEntity.setFieldComment(fieldEntity.getFieldName());
+ }
+ fieldEntity.setInlongGroupId(groupId);
+ fieldEntity.setInlongStreamId(streamId);
+ fieldEntity.setSinkType(sinkType);
+ fieldEntity.setSinkId(sinkId);
+ fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
+ entityList.add(fieldEntity);
+ }
+
+ sinkFieldMapper.insertAll(entityList);
+ LOGGER.info("success to save field");
+ }
+
+ @Override
+ public SinkResponse getByEntity(@NotNull StreamSinkEntity entity) {
+ Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+ String existType = entity.getSinkType();
+ Preconditions.checkTrue(SinkType.SINK_POSTGRES.equals(existType),
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_POSTGRES, existType));
+ SinkResponse response = this.getFromEntity(entity, PostgresSinkResponse::new);
+ List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(entity.getId());
+ List<SinkFieldResponse> infos = CommonBeanUtils.copyListProperties(entities, SinkFieldResponse::new);
+ response.setFieldList(infos);
+ return response;
+ }
+
+ @Override
+ public <T> T getFromEntity(StreamSinkEntity entity, Supplier<T> target) {
+ T result = target.get();
+ if (entity == null) {
+ return result;
+ }
+ String existType = entity.getSinkType();
+ Preconditions.checkTrue(SinkType.SINK_POSTGRES.equals(existType),
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_POSTGRES, existType));
+
+ PostgresSinkDTO dto = PostgresSinkDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, result, true);
+ CommonBeanUtils.copyProperties(dto, result, true);
+
+ return result;
+ }
+
+ @Override
+ public PageInfo<? extends SinkListResponse> getPageInfo(Page<StreamSinkEntity> entityPage) {
+ if (CollectionUtils.isEmpty(entityPage)) {
+ return new PageInfo<>();
+ }
+ return entityPage.toPageInfo(entity -> this.getFromEntity(entity, PostgresSinkListResponse::new));
+ }
+
+ @Override
+ public void updateOpt(SinkRequest request, String operator) {
+ String sinkType = request.getSinkType();
+ Preconditions.checkTrue(SinkType.SINK_POSTGRES.equals(sinkType),
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_POSTGRES, sinkType));
+
+ StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
+ Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
+ PostgresSinkRequest postgresSinkRequest = (PostgresSinkRequest) request;
+ CommonBeanUtils.copyProperties(postgresSinkRequest, entity, true);
+ try {
+ PostgresSinkDTO dto = PostgresSinkDTO.getFromRequest(postgresSinkRequest);
+ entity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage());
+ }
+
+ entity.setPreviousStatus(entity.getStatus());
+ entity.setStatus(SinkStatus.CONFIG_ING.getCode());
+ entity.setModifier(operator);
+ entity.setModifyTime(new Date());
+ sinkMapper.updateByPrimaryKeySelective(entity);
+
+ boolean onlyAdd = SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(entity.getPreviousStatus());
+ this.updateFieldOpt(onlyAdd, postgresSinkRequest);
+
+ LOGGER.info("success to update sink of type={}", sinkType);
+ }
+
+ @Override
+ public void updateFieldOpt(Boolean onlyAdd, SinkRequest request) {
+ Integer sinkId = request.getId();
+ List<SinkFieldRequest> fieldRequestList = request.getFieldList();
+ if (CollectionUtils.isEmpty(fieldRequestList)) {
+ return;
+ }
+ if (onlyAdd) {
+ List<StreamSinkFieldEntity> existsFieldList = sinkFieldMapper.selectBySinkId(sinkId);
+ if (existsFieldList.size() > fieldRequestList.size()) {
+ throw new BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
+ }
+ for (int i = 0; i < existsFieldList.size(); i++) {
+ if (!existsFieldList.get(i).getFieldName().equals(fieldRequestList.get(i).getFieldName())) {
+ throw new BusinessException(ErrorCodeEnum.SINK_FIELD_UPDATE_NOT_ALLOWED);
+ }
+ }
+ }
+ // First physically delete the existing fields
+ sinkFieldMapper.deleteAll(sinkId);
+ // Then batch save the sink fields
+ this.saveFieldOpt(request);
+ LOGGER.info("success to update field");
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 62c917500..75f8998a0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -30,6 +30,7 @@ import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaOffset;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSourceResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.enums.ScanStartupMode;
@@ -37,6 +38,7 @@ import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.format.CsvFormat;
@@ -71,6 +73,8 @@ public class ExtractNodeUtils {
return createExtractNode((KafkaSourceResponse) sourceResponse);
case PULSAR:
return createExtractNode((PulsarSourceResponse) sourceResponse);
+ case POSTGRES:
+ return createExtractNode((PostgresSourceResponse) sourceResponse);
default:
throw new IllegalArgumentException(
String.format("Unsupported sourceType=%s to create extractNode", sourceType));
@@ -249,4 +253,25 @@ public class ExtractNodeUtils {
startupMode.getValue(),
primaryKey);
}
+
+ /**
+ * Create PostgresExtractNode based PostgresSourceResponse
+ *
+ * @param postgresSourceResponse postgres source response
+ * @return postgres extract node info
+ */
+ public static PostgresExtractNode createExtractNode(PostgresSourceResponse postgresSourceResponse) {
+ List<InlongStreamFieldInfo> streamFieldInfos = postgresSourceResponse.getFieldList();
+ String id = postgresSourceResponse.getSourceName();
+ String name = postgresSourceResponse.getSourceName();
+ List<FieldInfo> fields = streamFieldInfos.stream()
+ .map(streamFieldInfo -> FieldInfoUtils.parseStreamFieldInfo(streamFieldInfo, name))
+ .collect(Collectors.toList());
+ return new PostgresExtractNode(id, name, fields, null, null,
+ postgresSourceResponse.getPrimaryKey(), postgresSourceResponse.getTableNames(),
+ postgresSourceResponse.getHostname(), postgresSourceResponse.getUsername(),
+ postgresSourceResponse.getPassword(), postgresSourceResponse.getDatabase(),
+ postgresSourceResponse.getSchema(), postgresSourceResponse.getPort(),
+ postgresSourceResponse.getDecodingPluginName());
+ }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 2cd25a033..504f4fc73 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -27,6 +27,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hbase.HbaseSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkResponse;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.format.AvroFormat;
@@ -38,6 +39,7 @@ import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.node.load.PostgresLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
import java.util.List;
@@ -72,6 +74,8 @@ public class LoadNodeUtils {
return createLoadNode((HiveSinkResponse) sinkResponse);
case HBASE:
return createLoadNode((HbaseSinkResponse) sinkResponse);
+ case POSTGRES:
+ return createLoadNode((PostgresSinkResponse) sinkResponse);
default:
throw new IllegalArgumentException(
String.format("Unsupported sinkType=%s to create loadNode", sinkType));
@@ -222,6 +226,29 @@ public class LoadNodeUtils {
}
/**
+ * create postgres load node
+ * @param postgresSinkResponse postgresSinkResponse
+ * @return postgres load node
+ */
+ public static PostgresLoadNode createLoadNode(PostgresSinkResponse postgresSinkResponse) {
+ List<SinkFieldResponse> sinkFieldResponses = postgresSinkResponse.getFieldList();
+
+ String name = postgresSinkResponse.getSinkName();
+ List<FieldInfo> fields = sinkFieldResponses.stream()
+ .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse,
+ name))
+ .collect(Collectors.toList());
+ List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
+ return new PostgresLoadNode(postgresSinkResponse.getSinkName(),
+ postgresSinkResponse.getSinkName(),
+ fields, fieldRelationShips, null, null, 1,
+ null, postgresSinkResponse.getJdbcUrl(), postgresSinkResponse.getUsername(),
+ postgresSinkResponse.getPassword(),
+ postgresSinkResponse.getDbName() + "." + postgresSinkResponse.getTableName(),
+ postgresSinkResponse.getPrimaryKey());
+ }
+
+ /**f
* Parse information field of data sink.
*/
public static List<FieldRelationShip> parseSinkFields(List<SinkFieldResponse> sinkFieldResponses, String sinkName) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgres/PostgresSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgres/PostgresSourceOperation.java
new file mode 100644
index 000000000..6f813f9a4
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/postgres/PostgresSourceOperation.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.postgres;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageInfo;
+import java.util.function.Supplier;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceDTO;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.postgres.PostgresSourceResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.service.source.AbstractSourceOperation;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * postgres stream source operation.
+ */
+@Service
+public class PostgresSourceOperation extends AbstractSourceOperation {
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(SourceType sourceType) {
+ return SourceType.POSTGRES == sourceType;
+ }
+
+ @Override
+ protected String getSourceType() {
+ return SourceType.POSTGRES.getType();
+ }
+
+ @Override
+ protected SourceResponse getResponse() {
+ return new PostgresSourceResponse();
+ }
+
+ @Override
+ public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntity> entityPage) {
+ if (CollectionUtils.isEmpty(entityPage)) {
+ return new PageInfo<>();
+ }
+ return entityPage.toPageInfo(entity -> this.getFromEntity(entity, PostgresSourceListResponse::new));
+ }
+
+ @Override
+ protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
+ PostgresSourceRequest sourceRequest = (PostgresSourceRequest) request;
+ CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+ try {
+ PostgresSourceDTO dto = PostgresSourceDTO.getFromRequest(sourceRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+ @Override
+ public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
+ T result = target.get();
+ if (entity == null) {
+ return result;
+ }
+ String existType = entity.getSourceType();
+ Preconditions.checkTrue(getSourceType().equals(existType),
+ String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
+ PostgresSourceDTO dto = PostgresSourceDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(entity, result, true);
+ CommonBeanUtils.copyProperties(dto, result, true);
+ return result;
+ }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
new file mode 100644
index 000000000..f5eda55fe
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core.sink;
+
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Stream sink service test
+ */
+public class PostgresStreamSinkServiceTest extends ServiceBaseTest {
+
+ private static final String globalGroupId = "b_group1";
+ private static final String globalStreamId = "stream1";
+ private static final String globalOperator = "admin";
+
+ @Autowired
+ private StreamSinkService sinkService;
+ @Autowired
+ private InlongStreamServiceTest streamServiceTest;
+
+ /**
+ * Save sink info.
+ */
+ public Integer saveSink(String sinkName) {
+ streamServiceTest.saveInlongStream(globalGroupId, globalStreamId,
+ globalOperator);
+ PostgresSinkRequest sinkInfo = new PostgresSinkRequest();
+ sinkInfo.setInlongGroupId(globalGroupId);
+ sinkInfo.setInlongStreamId(globalStreamId);
+ sinkInfo.setSinkType(SinkType.SINK_POSTGRES);
+
+ sinkInfo.setJdbcUrl("jdbc:postgresql://localhost:5432/postgres");
+ sinkInfo.setUsername("postgres");
+ sinkInfo.setPassword("inlong");
+ sinkInfo.setDbName("public");
+ sinkInfo.setTableName("user");
+ sinkInfo.setPrimaryKey("name,age");
+
+ sinkInfo.setSinkName(sinkName);
+ sinkInfo.setEnableCreateResource(GlobalConstants.DISABLE_CREATE_RESOURCE);
+ return sinkService.save(sinkInfo, globalOperator);
+ }
+
+ /**
+ * Delete postgres sink info by sink id.
+ */
+ public void deletePostgresSink(Integer postgresSinkId) {
+ boolean result = sinkService.delete(postgresSinkId, globalOperator);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void testListByIdentifier() {
+ Integer postgresSinkId = this.saveSink("postgres_default1");
+ SinkResponse sink = sinkService.get(postgresSinkId);
+ Assert.assertEquals(globalGroupId, sink.getInlongGroupId());
+ deletePostgresSink(postgresSinkId);
+ }
+
+ @Test
+ public void testGetAndUpdate() {
+ Integer postgresSinkId = this.saveSink("postgres_default2");
+ SinkResponse response = sinkService.get(postgresSinkId);
+ Assert.assertEquals(globalGroupId, response.getInlongGroupId());
+
+ PostgresSinkResponse postgresSinkResponse = (PostgresSinkResponse) response;
+ postgresSinkResponse.setEnableCreateResource(GlobalConstants.ENABLE_CREATE_RESOURCE);
+
+ PostgresSinkRequest request = CommonBeanUtils.copyProperties(postgresSinkResponse,
+ PostgresSinkRequest::new);
+ boolean result = sinkService.update(request, globalOperator);
+ Assert.assertTrue(result);
+ deletePostgresSink(postgresSinkId);
+ }
+
+}