You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/02 04:26:01 UTC

[incubator-inlong] branch master updated: [INLONG-4356][Manager] Support creating Postgres resource (#4357)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3721efc1d [INLONG-4356][Manager] Support creating Postgres resource (#4357)
3721efc1d is described below

commit 3721efc1d0208e10676f302078552330073e6cf7
Author: baomingyu <ba...@163.com>
AuthorDate: Wed Jun 1 23:25:56 2022 -0500

    [INLONG-4356][Manager] Support creating Postgres resource (#4357)
    
    * [INLONG-4356][Manager] Support creating Postgres resource
    
    * [INLONG-4381][Manager] Optimize the Postgres related codes
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../pojo/sink/postgres/PostgresColumnInfo.java     |  30 ++++
 .../common/pojo/sink/postgres/PostgresSinkDTO.java |  23 ++-
 .../pojo/sink/postgres/PostgresTableInfo.java      |  36 ++++
 .../resource/postgres/PostgresJdbcUtils.java       | 185 +++++++++++++++++++++
 .../postgres/PostgresResourceOperator.java         | 144 ++++++++++++++++
 .../resource/postgres/PostgresSqlBuilder.java      | 123 ++++++++++++++
 .../core/sink/PostgresStreamSinkServiceTest.java   |  63 +++++++
 7 files changed, 600 insertions(+), 4 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresColumnInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresColumnInfo.java
new file mode 100644
index 000000000..c28ac2267
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresColumnInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.postgres;
+
+import lombok.Data;
+
+/**
+ * Postgres column info.
+ */
+@Data
+public class PostgresColumnInfo {
+    private String name;
+    private String type;
+    private String desc;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkDTO.java
index 69f1f5261..fa76c5d65 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresSinkDTO.java
@@ -20,8 +20,6 @@ package org.apache.inlong.manager.common.pojo.sink.postgres;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Map;
-import javax.validation.constraints.NotNull;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -29,6 +27,10 @@ import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
+import javax.validation.constraints.NotNull;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Postgres sink info
  */
@@ -77,8 +79,9 @@ public class PostgresSinkDTO {
     }
 
     /**
-     *  get DTO from json
-     * @param extParams extParams
+     * Get Postgres sink info from JSON string
+     *
+     * @param extParams JSON string
      * @return postgres sink DTO
      */
     public static PostgresSinkDTO getFromJson(@NotNull String extParams) {
@@ -90,4 +93,16 @@ public class PostgresSinkDTO {
         }
     }
 
+    /**
+     * Get Postgres table info
+     */
+    public static PostgresTableInfo getPostgresTableInfo(PostgresSinkDTO pgSink, List<PostgresColumnInfo> columnList) {
+        PostgresTableInfo tableInfo = new PostgresTableInfo();
+        tableInfo.setDbName(pgSink.getDbName());
+        tableInfo.setTableName(pgSink.getTableName());
+        tableInfo.setPrimaryKey(pgSink.getPrimaryKey());
+        tableInfo.setColumns(columnList);
+        return tableInfo;
+    }
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresTableInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresTableInfo.java
new file mode 100644
index 000000000..9ebc885a9
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/postgres/PostgresTableInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.postgres;
+
+import java.util.List;
+import lombok.Data;
+
+/**
+ * Postgres table info.
+ */
+@Data
+public class PostgresTableInfo {
+    private String dbName;
+    private String tableName;
+    private String tableDesc;
+
+    private String partitionBy;
+    private String orderBy;
+    private String primaryKey;
+    private List<PostgresColumnInfo> columns;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresJdbcUtils.java
new file mode 100644
index 000000000..f92244ce5
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresJdbcUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.postgres;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utils for Postgres JDBC.
+ */
+public class PostgresJdbcUtils {
+
+    private static final String POSTGRES_DRIVER_CLASS = "org.postgresql.Driver";
+    private static final String SCHEMA_PATTERN = "public";
+    private static final String TABLE_TYPE = "TABLE";
+    private static final String COLUMN_LABEL_TABLE = "TABLE_NAME";
+    private static final String COLUMN_LABEL_COUNT = "count";
+    private static final String POSTGRES_JDBC_PREFIX = "jdbc:postgresql";
+
+    private static final Logger LOG = LoggerFactory.getLogger(PostgresJdbcUtils.class);
+
+    /**
+     * Get Postgres connection from Postgres url and user
+     */
+    public static Connection getConnection(String url, String user, String password)
+            throws Exception {
+        if (StringUtils.isBlank(url) || !url.startsWith(POSTGRES_JDBC_PREFIX)) {
+            throw new Exception("Postgres server URL was invalid, it should start with jdbc:postgresql");
+        }
+        Connection conn;
+        try {
+            Class.forName(POSTGRES_DRIVER_CLASS);
+            conn = DriverManager.getConnection(url, user, password);
+        } catch (Exception e) {
+            String errorMsg = "get postgres connection error, please check postgres jdbc url, username or password!";
+            LOG.error(errorMsg, e);
+            throw new Exception(errorMsg + " other error msg: " + e.getMessage());
+        }
+        if (conn == null) {
+            throw new Exception("get postgres connection failed, please contact administrator");
+        }
+        LOG.info("get postgres connection success, url={}", url);
+        return conn;
+    }
+
+    /**
+     * Execute One Postgres Sql command
+     *
+     * @return true if execute successfully
+     */
+    public static boolean executeSql(String sql, String url, String user, String password) throws Exception {
+        try (Connection conn = getConnection(url, user, password)) {
+            Statement stmt = conn.createStatement();
+            LOG.info("execute sql [{}] success for url: {}", sql, url);
+            return stmt.execute(sql);
+        }
+    }
+
+    /**
+     * Execute One query Postgres Sql command
+     *
+     * @return the query result set
+     */
+    public static ResultSet executeQuerySql(String sql, String url, String user, String password)
+            throws Exception {
+        try (Connection conn = getConnection(url, user, password)) {
+            Statement stmt = conn.createStatement();
+            LOG.info("execute sql [{}] success for url: {}", sql, url);
+            return stmt.executeQuery(sql);
+        }
+    }
+
+    /**
+     * Execute Batch Postgres Sql commands
+     */
+    public static void executeSqlBatch(List<String> sql, String url, String user, String password)
+            throws Exception {
+        try (Connection conn = getConnection(url, user, password)) {
+            Statement stmt = conn.createStatement();
+            for (String entry : sql) {
+                stmt.execute(entry);
+            }
+            LOG.info("execute sql [{}] success for url: {}", sql, url);
+        }
+    }
+
+    /**
+     * Create Postgres database
+     */
+    public static void createDb(String url, String user, String password, String dbName) throws Exception {
+        String checkDbSql = PostgresSqlBuilder.getCheckDatabase(dbName);
+        ResultSet resultSet = executeQuerySql(checkDbSql, url, user, password);
+        if (resultSet != null) {
+            resultSet.next();
+            if (resultSet.getInt(COLUMN_LABEL_COUNT) == 0) {
+                String createDbSql = PostgresSqlBuilder.buildCreateDbSql(dbName);
+                executeSql(createDbSql, url, user, password);
+            }
+        }
+    }
+
+    /**
+     * Create Postgres table
+     */
+    public static void createTable(String url, String user, String password, PostgresTableInfo tableInfo)
+            throws Exception {
+        String createTableSql = PostgresSqlBuilder.buildCreateTableSql(tableInfo);
+        PostgresJdbcUtils.executeSql(createTableSql, url, user, password);
+    }
+
+    /**
+     * Get Postgres tables from the Postgres metadata
+     */
+    public static boolean checkTablesExist(String url, String user, String password, String dbName, String tableName)
+            throws Exception {
+        boolean result = false;
+        try (Connection conn = getConnection(url, user, password)) {
+            DatabaseMetaData metaData = conn.getMetaData();
+            ResultSet rs = metaData.getTables(conn.getCatalog(), SCHEMA_PATTERN, tableName, new String[]{TABLE_TYPE});
+            if (rs != null) {
+                rs.next();
+                result = rs.getRow() > 0 && tableName.equals(rs.getString(COLUMN_LABEL_TABLE));
+                LOG.info("check table exist for db={} table={}, result={}", dbName, tableName, result);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Query Postgres columns
+     */
+    public static List<PostgresColumnInfo> getColumns(String url, String user, String password, String tableName)
+            throws Exception {
+        String querySql = PostgresSqlBuilder.buildDescTableSql(tableName);
+        try (Connection conn = getConnection(url, user, password);
+                Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(querySql)) {
+            List<PostgresColumnInfo> columnList = new ArrayList<>();
+
+            while (rs.next()) {
+                PostgresColumnInfo columnInfo = new PostgresColumnInfo();
+                columnInfo.setName(rs.getString(1));
+                columnInfo.setType(rs.getString(2));
+                columnList.add(columnInfo);
+            }
+            return columnList;
+        }
+    }
+
+    /**
+     * Add columns for Postgres table
+     */
+    public static void addColumns(String url, String user, String password, String tableName,
+            List<PostgresColumnInfo> columnList) throws Exception {
+        List<String> addColumnSql = PostgresSqlBuilder.buildAddColumnsSql(tableName, columnList);
+        PostgresJdbcUtils.executeSqlBatch(addColumnSql, url, user, password);
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java
new file mode 100644
index 000000000..d4ab18bc0
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresResourceOperator.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.postgres;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkDTO;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresTableInfo;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.resource.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Postgres resource operator
+ */
+@Service
+public class PostgresResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresResourceOperator.class);
+
+    @Autowired
+    private StreamSinkService sinkService;
+
+    @Autowired
+    private StreamSinkFieldEntityMapper postgresFieldMapper;
+
+    @Override
+    public Boolean accept(SinkType sinkType) {
+        return SinkType.POSTGRES == sinkType;
+    }
+
+    /**
+     * Create Postgres sink resource
+     *
+     * @param sinkInfo Postgres sink info
+     */
+    @Override
+    public void createSinkResource(SinkInfo sinkInfo) {
+        LOGGER.info("begin to create postgres res sinkId={}", sinkInfo.getId());
+        if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+            LOGGER.warn("postgres resource [" + sinkInfo.getId() + "] already success, skip to create");
+            return;
+        } else if (GlobalConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+            LOGGER.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
+            return;
+        }
+        this.createTable(sinkInfo);
+    }
+
+    /**
+     * Create Postgres table
+     */
+    private void createTable(SinkInfo sinkInfo) {
+        LOGGER.info("begin to create postgres table for sinkId={}", sinkInfo.getId());
+        List<StreamSinkFieldEntity> fieldList = postgresFieldMapper.selectBySinkId(sinkInfo.getId());
+        if (CollectionUtils.isEmpty(fieldList)) {
+            LOGGER.warn("no postgres fields found, skip to create table for sinkId={}", sinkInfo.getId());
+        }
+
+        // set columns
+        List<PostgresColumnInfo> columnList = new ArrayList<>();
+        for (StreamSinkFieldEntity field : fieldList) {
+            PostgresColumnInfo columnInfo = new PostgresColumnInfo();
+            columnInfo.setName(field.getFieldName());
+            columnInfo.setType(field.getFieldType());
+            columnInfo.setDesc(field.getFieldComment());
+            columnList.add(columnInfo);
+        }
+
+        try {
+            PostgresSinkDTO pgInfo = PostgresSinkDTO.getFromJson(sinkInfo.getExtParams());
+            PostgresTableInfo tableInfo = PostgresSinkDTO.getPostgresTableInfo(pgInfo, columnList);
+            String url = pgInfo.getJdbcUrl();
+            String user = pgInfo.getUsername();
+            String password = pgInfo.getPassword();
+
+            String dbName = tableInfo.getDbName();
+            String tableName = tableInfo.getTableName();
+
+            // 1. create database if not exists
+            PostgresJdbcUtils.createDb(url, user, password, dbName);
+
+            // 2. check if the table exists
+            boolean tableExists = PostgresJdbcUtils.checkTablesExist(url, user, password, dbName, tableName);
+
+            // 3. table not exists, create it
+            if (!tableExists) {
+                PostgresJdbcUtils.createTable(url, user, password, tableInfo);
+            } else {
+                // 4. table exists, add columns - skip the exists columns
+                List<PostgresColumnInfo> existColumns = PostgresJdbcUtils.getColumns(url, user, password, tableName);
+                List<String> columnNameList = new ArrayList<>();
+                existColumns.forEach(e -> columnNameList.add(e.getName()));
+                List<PostgresColumnInfo> needAddColumns = tableInfo.getColumns().stream()
+                        .filter((pgcInfo) -> !columnNameList.contains(pgcInfo.getName())).collect(Collectors.toList());
+                if (CollectionUtils.isNotEmpty(needAddColumns)) {
+                    PostgresJdbcUtils.addColumns(url, user, password, tableName, needAddColumns);
+                }
+            }
+
+            // 5. update the sink status to success
+            String info = "success to create postgres resource";
+            sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+            LOGGER.info(info + " for sinkInfo={}", sinkInfo);
+        } catch (Throwable e) {
+            String errMsg = "create postgres table failed: " + e.getMessage();
+            LOGGER.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new WorkflowException(errMsg);
+        }
+
+        LOGGER.info("success create postgres table for data sink [" + sinkInfo.getId() + "]");
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresSqlBuilder.java
new file mode 100644
index 000000000..26ab4043e
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/postgres/PostgresSqlBuilder.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.postgres;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Builder for Postgres SQL string
+ */
+public class PostgresSqlBuilder {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSqlBuilder.class);
+
+    /**
+     * Build check database exists SQL
+     */
+    public static String getCheckDatabase(String dbName) {
+        String sql = "SELECT datname FROM from pg_catalog.pg_database WHERE datname = '" + dbName + "'";
+        LOGGER.info("check database sql: {}", sql);
+        return sql;
+    }
+
+    /**
+     * Build create database SQL
+     */
+    public static String buildCreateDbSql(String dbName) {
+        String sql = "CREATE DATABASE " + dbName;
+        LOGGER.info("create db sql: {}", sql);
+        return sql;
+    }
+
+    /**
+     * Build create table SQL
+     */
+    public static String buildCreateTableSql(PostgresTableInfo table) {
+        StringBuilder sql = new StringBuilder();
+        // Support _ beginning with underscore
+        String dbTableName = table.getTableName();
+        sql.append("CREATE TABLE ").append(dbTableName);
+
+        // Construct columns and partition columns
+        sql.append(buildCreateColumnsSql(table.getColumns()));
+
+        LOGGER.info("create table sql: {}", sql);
+        return sql.toString();
+    }
+
+    /**
+     * Build add column SQL
+     */
+    public static List<String> buildAddColumnsSql(String tableName, List<PostgresColumnInfo> columnList) {
+        List<String> columnInfoList = getColumnsInfo(columnList);
+        List<String> resultList = new ArrayList<>();
+        for (String columnInfo : columnInfoList) {
+            String sql = "ALTER TABLE " + tableName + " ADD COLUMN " + columnInfo;
+            resultList.add(sql);
+        }
+
+        LOGGER.info("add columns sql={}", resultList);
+        return resultList;
+    }
+
+    /**
+     * Build create column SQL
+     */
+    private static String buildCreateColumnsSql(List<PostgresColumnInfo> columns) {
+        List<String> columnList = getColumnsInfo(columns);
+        String sql = " (" + StringUtils.join(columnList, ",") + ") ";
+        LOGGER.info("create columns sql={}", sql);
+        return sql;
+    }
+
+    /**
+     * Build column info
+     */
+    private static List<String> getColumnsInfo(List<PostgresColumnInfo> columns) {
+        List<String> columnList = new ArrayList<>();
+        for (PostgresColumnInfo columnInfo : columns) {
+            // Construct columns and partition columns
+            String columnStr = columnInfo.getName() + " " + columnInfo.getType();
+            columnList.add(columnStr);
+        }
+        return columnList;
+    }
+
+    /**
+     * Build query table SQL
+     */
+    public static String buildDescTableSql(String tableName) {
+        StringBuilder sql = new StringBuilder();
+        sql.append("SELECT att.attname as filedName, format_type(att.atttypid, att.atttypmod) as filedType"
+                        + " FROM pg_attribute as att, pg_class as clz"
+                        + " WHERE att.attrelid = clz.oid and att.attnum > 0 and clz.relname = '")
+                .append(tableName)
+                .append("';");
+
+        LOGGER.info("desc table sql={}", sql);
+        return sql.toString();
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
index 02ff22663..230485e7a 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/PostgresStreamSinkServiceTest.java
@@ -17,19 +17,28 @@
 
 package org.apache.inlong.manager.service.core.sink;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.GlobalConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresColumnInfo;
 import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSink;
 import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresSinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.postgres.PostgresTableInfo;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.resource.postgres.PostgresJdbcUtils;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
 /**
  * Stream sink service test
  */
@@ -97,4 +106,58 @@ public class PostgresStreamSinkServiceTest extends ServiceBaseTest {
         deletePostgresSink(postgresSinkId);
     }
 
+    /**
+     * Just using in local test
+     */
+    @Ignore
+    public void testDbResource() {
+        String url = "jdbc:postgresql://localhost:5432/test";
+        String user = "postgres";
+        String password = "123456";
+        String dbName = "test";
+        String tableName = "test_123";
+
+        try {
+            PostgresJdbcUtils.createDb(url, user, password, dbName);
+            List<PostgresColumnInfo> columnInfoList = new ArrayList<>();
+            PostgresColumnInfo info = new PostgresColumnInfo();
+            info.setType("integer");
+            info.setName("id");
+            columnInfoList.add(info);
+            PostgresColumnInfo info2 = new PostgresColumnInfo();
+            info2.setType("integer");
+            info2.setName("age");
+            columnInfoList.add(info2);
+
+            PostgresColumnInfo info3 = new PostgresColumnInfo();
+            info3.setType("integer");
+            info3.setName("high");
+            columnInfoList.add(info3);
+
+            PostgresTableInfo tableInfo = new PostgresTableInfo();
+            tableInfo.setDbName(dbName);
+            tableInfo.setColumns(columnInfoList);
+            tableInfo.setTableName(tableName);
+
+            boolean tableExists = PostgresJdbcUtils.checkTablesExist(url, user, password, dbName, tableName);
+            if (!tableExists) {
+                PostgresJdbcUtils.createTable(url, user, password, tableInfo);
+            } else {
+                List<PostgresColumnInfo> existColumns = PostgresJdbcUtils.getColumns(url, user, password, tableName);
+                List<String> columnNameList = new ArrayList<>();
+                existColumns.forEach(columnInfo -> columnNameList.add(columnInfo.getName()));
+
+                List<PostgresColumnInfo> needAddColumns = tableInfo.getColumns().stream()
+                        .filter((columnInfo) -> !columnNameList.contains(columnInfo.getName()))
+                        .collect(Collectors.toList());
+                if (CollectionUtils.isNotEmpty(needAddColumns)) {
+                    PostgresJdbcUtils.addColumns(url, user, password, tableName, needAddColumns);
+                }
+            }
+        } catch (Exception e) {
+            // print to local console
+            e.printStackTrace();
+        }
+    }
+
 }