You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/02 08:18:58 UTC

[inlong] branch master updated: [INLONG-5084][Manager] Support create Greenplum databases and tables (#5317)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c4663e90c [INLONG-5084][Manager] Support create Greenplum databases and tables (#5317)
c4663e90c is described below

commit c4663e90c4d47e10412a892789b18c0e88f6b62e
Author: haibo.duan <dh...@live.cn>
AuthorDate: Tue Aug 2 16:18:54 2022 +0800

    [INLONG-5084][Manager] Support create Greenplum databases and tables (#5317)
---
 .../pojo/sink/greenplum/GreenplumColumnInfo.java   |  37 +++
 .../pojo/sink/greenplum/GreenplumSinkDTO.java      |  18 ++
 .../pojo/sink/greenplum/GreenplumTableInfo.java    |  41 +++
 .../sink/greenplum/GreenplumJdbcUtils.java         | 271 ++++++++++++++++++++
 .../sink/greenplum/GreenplumResourceOperator.java  | 117 +++++++++
 .../sink/greenplum/GreenplumSqlBuilder.java        | 276 +++++++++++++++++++++
 .../service/sink/GreenplumSinkServiceTest.java     |  67 +++++
 7 files changed, 827 insertions(+)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumColumnInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumColumnInfo.java
new file mode 100644
index 000000000..24d18124d
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumColumnInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.greenplum;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Greenplum column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class GreenplumColumnInfo {
+
+    private String name;
+
+    private String type;
+
+    private String comment;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
index 7b073154c..89571e84b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import javax.validation.constraints.NotNull;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -85,4 +86,21 @@ public class GreenplumSinkDTO {
         }
     }
 
+    /**
+     * Get Greenplum table info
+     *
+     * @param greenplumSink Greenplum sink dto,{@link GreenplumSinkDTO}
+     * @param columnList Greenplum column info list,{@link GreenplumColumnInfo}
+     * @return {@link GreenplumTableInfo}
+     */
+    public static GreenplumTableInfo getTableInfo(GreenplumSinkDTO greenplumSink,
+            List<GreenplumColumnInfo> columnList) {
+        GreenplumTableInfo tableInfo = new GreenplumTableInfo();
+        tableInfo.setTableName(greenplumSink.getTableName());
+        tableInfo.setPrimaryKey(greenplumSink.getPrimaryKey());
+        tableInfo.setUserName(greenplumSink.getUsername());
+        tableInfo.setColumns(columnList);
+        return tableInfo;
+    }
 }
+
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumTableInfo.java
new file mode 100644
index 000000000..a24148a19
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumTableInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.greenplum;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * Greenplum table info.
+ */
+@Data
+public class GreenplumTableInfo {
+
+    private String tableName;
+
+    private String comment;
+
+    private String primaryKey;
+
+    private String schemaName;
+
+    private String userName;
+
+    private List<GreenplumColumnInfo> columns;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumJdbcUtils.java
new file mode 100644
index 000000000..8ef5d581a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumJdbcUtils.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.greenplum;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumColumnInfo;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Utils for Greenplum JDBC.
+ */
+public class GreenplumJdbcUtils {
+
+    private static final String GREENPLUM_JDBC_PREFIX = "jdbc:postgresql";
+
+    private static final String GREENPLUM_DRIVER_CLASS = "org.postgresql.Driver";
+
+    public static final String GREENPLUM_DEFAULT_SCHEMA = "public";
+
+    private static final Logger LOG = LoggerFactory.getLogger(GreenplumJdbcUtils.class);
+
+    /**
+     * Get Greenplum connection from the url and user
+     */
+    public static Connection getConnection(final String url, final String user, final String password)
+            throws Exception {
+        if (StringUtils.isBlank(url) || !url.startsWith(GREENPLUM_JDBC_PREFIX)) {
+            throw new Exception("Greenplum server URL was invalid, it should start with jdbc:postgresql");
+        }
+        final Connection conn;
+        try {
+            Class.forName(GREENPLUM_DRIVER_CLASS);
+            conn = DriverManager.getConnection(url, user, password);
+        } catch (Exception e) {
+            final String errorMsg = "get Greenplum connection error, please check postgres jdbc url,"
+                    + " username or password!";
+            LOG.error(errorMsg, e);
+            throw new Exception(errorMsg + " other error msg: " + e.getMessage());
+        }
+        if (conn == null) {
+            throw new Exception("get Greenplum connection failed, please contact administrator");
+        }
+        LOG.info("get Greenplum connection success, url={}", url);
+        return conn;
+    }
+
+    /**
+     * Execute SQL command on Greenplum.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param sql SQL String to be executed
+     * @throws Exception on execute SQL error
+     */
+    public static void executeSql(final Connection conn, final String sql) throws Exception {
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(sql);
+            LOG.info("execute sql [{}] success", sql);
+        }
+    }
+
+    /**
+     * Execute batch query SQL on Greenplum.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param sqls SQL String to be executed
+     * @throws Exception on get execute SQL batch error
+     */
+    public static void executeSqlBatch(final Connection conn, final List<String> sqls)
+            throws Exception {
+        conn.setAutoCommit(false);
+        try (Statement stmt = conn.createStatement()) {
+            for (String entry : sqls) {
+                stmt.execute(entry);
+            }
+            conn.commit();
+        } finally {
+            conn.setAutoCommit(true);
+        }
+        LOG.info("execute batch sql [{}] success", sqls);
+    }
+
+    /**
+     * Create Greenplum table by GreenplumTableInfo
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param tableInfo Greenplum table info  {@link GreenplumTableInfo}
+     * @throws Exception on create table error
+     */
+    public static void createTable(final Connection conn, final GreenplumTableInfo tableInfo)
+            throws Exception {
+        if (checkTablesExist(conn, tableInfo.getSchemaName(), tableInfo.getTableName())) {
+            LOG.info("the table [{}] are exists", tableInfo.getTableName());
+        } else {
+            final List<String> createTableSqls = GreenplumSqlBuilder.buildCreateTableSql(tableInfo);
+            executeSqlBatch(conn, createTableSqls);
+            LOG.info("execute sql [{}] success", createTableSqls);
+        }
+    }
+
+    /**
+     * Create Greenplum schema by schemaNama
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param schemaName Greenplum schema name
+     * @param userName Greenplum user name
+     * @throws Exception on create schema error
+     */
+    public static void createSchema(final Connection conn, final String schemaName, final String userName)
+            throws Exception {
+        if (checkSchemaExist(conn, schemaName)) {
+            LOG.info("the schema [{}] are exists", schemaName);
+        } else {
+            final String sql = GreenplumSqlBuilder.buildCreateSchema(schemaName, userName);
+            executeSql(conn, sql);
+            LOG.info("execute create schema sql [{}] success", sql);
+        }
+    }
+
+    /**
+     * Check tables from the Greenplum information_schema.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param schemaName Greenplum database name
+     * @param tableName Greenplum table name
+     * @return true if table exist, otherwise false
+     * @throws Exception on check table exist error
+     */
+    public static boolean checkTablesExist(final Connection conn, final String schemaName, final String tableName)
+            throws Exception {
+        boolean result = false;
+        final String checkTableSql = GreenplumSqlBuilder.getCheckTable(schemaName, tableName);
+        try (Statement statement = conn.createStatement();
+                ResultSet resultSet = statement.executeQuery(checkTableSql)) {
+            if (null != resultSet && resultSet.next()) {
+                int size = resultSet.getInt(1);
+                if (size > 0) {
+                    result = true;
+                }
+            }
+        }
+        LOG.info("check table exist for username={} table={}, result={}", schemaName, tableName, result);
+        return result;
+    }
+
+    /**
+     * Check whether the column exists in the table.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param schemaName Greenplum schema name
+     * @param tableName Greenplum table name
+     * @param column Greenplum table column name
+     * @return true if column exist in the table, otherwise false
+     * @throws Exception on check column exist error
+     */
+    public static boolean checkColumnExist(final Connection conn, final String schemaName, final String tableName,
+            final String column) throws Exception {
+        boolean result = false;
+        final String checkColumnSql = GreenplumSqlBuilder.getCheckColumn(schemaName, tableName, column);
+        try (Statement statement = conn.createStatement();
+                ResultSet resultSet = statement.executeQuery(checkColumnSql)) {
+            if (Objects.nonNull(resultSet) && resultSet.next()) {
+                int count = resultSet.getInt(1);
+                if (count > 0) {
+                    result = true;
+                }
+            }
+        }
+        LOG.info("check column exist for table={}, column={}, result={}", tableName, column, result);
+        return result;
+    }
+
+    /**
+     * Check whether the schema exists.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param schemaName Greenplum schema name
+     * @return true if schema exist in the table, otherwise false
+     * @throws Exception on check column exist error
+     */
+    public static boolean checkSchemaExist(final Connection conn, final String schemaName) throws Exception {
+        boolean result = false;
+        if (GREENPLUM_DEFAULT_SCHEMA.equals(schemaName)) {
+            result = true;
+        } else {
+            final String checkColumnSql = GreenplumSqlBuilder.getCheckSchema(schemaName);
+            try (Statement statement = conn.createStatement();
+                    ResultSet resultSet = statement.executeQuery(checkColumnSql)) {
+                if (Objects.nonNull(resultSet) && resultSet.next()) {
+                    int count = resultSet.getInt(1);
+                    if (count > 0) {
+                        result = true;
+                    }
+                }
+            }
+        }
+        LOG.info("check schema exist for schema={}, result={}", schemaName, result);
+        return result;
+    }
+
+    /**
+     * Query all columns of the tableName.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param schemaName Greenplum schema name
+     * @param tableName Greenplum table name
+     * @return {@link List}
+     * @throws Exception on get columns error
+     */
+    public static List<GreenplumColumnInfo> getColumns(final Connection conn, final String schemaName,
+            final String tableName) throws Exception {
+        final List<GreenplumColumnInfo> columnList = new ArrayList<>();
+        final String querySql = GreenplumSqlBuilder.buildDescTableSql(schemaName, tableName);
+
+        try (Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(querySql)) {
+            while (rs.next()) {
+                columnList.add(new GreenplumColumnInfo(rs.getString(1), rs.getString(2),
+                        rs.getString(3)));
+            }
+        }
+        return columnList;
+    }
+
+    /**
+     * Add columns for Greenpluum table.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param schemaName Greenpluum schema name
+     * @param tableName Greenpluum table name
+     * @param columns Greenpluum columns to be added
+     * @throws Exception on add columns error
+     */
+    public static void addColumns(final Connection conn, final String schemaName, final String tableName,
+            final List<GreenplumColumnInfo> columns) throws Exception {
+        final List<GreenplumColumnInfo> columnInfos = new ArrayList<>();
+
+        for (GreenplumColumnInfo columnInfo : columns) {
+            if (!checkColumnExist(conn, schemaName, tableName, columnInfo.getName())) {
+                columnInfos.add(columnInfo);
+            }
+        }
+        final List<String> addColumnSql = GreenplumSqlBuilder.buildAddColumnsSql(schemaName, tableName, columnInfos);
+        executeSqlBatch(conn, addColumnSql);
+        LOG.info("execute add columns sql [{}] success", addColumnSql);
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumResourceOperator.java
new file mode 100644
index 000000000..a621b3a03
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumResourceOperator.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.greenplum;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumColumnInfo;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSinkDTO;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumTableInfo;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.sql.Connection;
+import java.util.List;
+
+public class GreenplumResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GreenplumResourceOperator.class);
+
+    public static final String GREENPLUM_DEFAULT_SCHEMA = "public";
+
+    @Autowired
+    private StreamSinkService sinkService;
+
+    @Autowired
+    private StreamSinkFieldEntityMapper fieldEntityMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.GREENPLUM.equals(sinkType);
+    }
+
+    @Override
+    public void createSinkResource(SinkInfo sinkInfo) {
+        LOG.info("begin to create Greenplum resources sinkId={}", sinkInfo.getId());
+        if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+            LOG.warn("Greenplum resource [" + sinkInfo.getId() + "] already success, skip to create");
+            return;
+        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+            LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
+            return;
+        }
+        this.createTable(sinkInfo);
+    }
+
+    /**
+     * Create Greenplum table by SinkInfo.
+     *
+     * @param sinkInfo {@link SinkInfo}
+     */
+    private void createTable(SinkInfo sinkInfo) {
+        LOG.info("begin to create Greenplum table for sinkId={}", sinkInfo.getId());
+        List<StreamSinkFieldEntity> fieldList = fieldEntityMapper.selectBySinkId(sinkInfo.getId());
+        if (CollectionUtils.isEmpty(fieldList)) {
+            LOG.warn("no Greenplum fields found, skip to create table for sinkId={}", sinkInfo.getId());
+        }
+        // set columns
+        final List<GreenplumColumnInfo> columnList = Lists.newArrayList();
+        fieldList.forEach(field -> {
+            columnList.add(
+                    new GreenplumColumnInfo(field.getFieldName(), field.getFieldType(), field.getFieldComment())
+            );
+        });
+
+        GreenplumSinkDTO greenplumSink = GreenplumSinkDTO.getFromJson(sinkInfo.getExtParams());
+        GreenplumTableInfo tableInfo = GreenplumSinkDTO.getTableInfo(greenplumSink, columnList);
+        if (StringUtils.isEmpty(tableInfo.getSchemaName())) {
+            tableInfo.setSchemaName(GREENPLUM_DEFAULT_SCHEMA);
+        }
+        try (Connection conn = GreenplumJdbcUtils.getConnection(greenplumSink.getJdbcUrl(), greenplumSink.getUsername(),
+                greenplumSink.getPassword())) {
+            // 1.If schema not exists,create it
+            GreenplumJdbcUtils.createSchema(conn, tableInfo.getTableName(), tableInfo.getUserName());
+            // 2.If table not exists, create it
+            GreenplumJdbcUtils.createTable(conn, tableInfo);
+            // 3.Table exists, add columns - skip the exists columns
+            GreenplumJdbcUtils.addColumns(conn, tableInfo.getSchemaName(), tableInfo.getTableName(), columnList);
+            // 4.Update the sink status to success
+            final String info = "success to create Greenplum resource";
+            sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+            LOG.info(info + " for sinkInfo={}", sinkInfo);
+            // 4. close connection.
+        } catch (Throwable e) {
+            String errMsg = "create Greenplum table failed: " + e.getMessage();
+            LOG.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new WorkflowException(errMsg);
+        }
+        LOG.info("success create Greenplum table for data sink [" + sinkInfo.getId() + "]");
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java
new file mode 100644
index 000000000..257397b57
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.greenplum;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumColumnInfo;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class GreenplumSqlBuilder {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GreenplumSqlBuilder.class);
+
+    /**
+     * Build SQL to check whether the table exists.
+     *
+     * @param schemaName Greenplum schema name
+     * @param tableName Greenplum table name
+     * @return the check table SQL string
+     */
+    public static String getCheckTable(final String schemaName, final String tableName) {
+        final StringBuilder sqlBuilder = new StringBuilder()
+                .append("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES  WHERE TABLE_SCHEMA = '")
+                .append(schemaName)
+                .append("' AND TABLE_TYPE = 'BASE TABLE' ")
+                .append(" AND TABLE_NAME = '")
+                .append(tableName)
+                .append("' ;");
+        LOGGER.info("check table sql: {}", sqlBuilder);
+        return sqlBuilder.toString();
+    }
+
+    /**
+     * Build SQL to check whether the schema exists.
+     *
+     * @param schemaName
+     * @return
+     */
+    public static String getCheckSchema(final String schemaName) {
+        return new StringBuilder()
+                .append("SELECT COUNT(1) FROM INFORMATION_SCHEMA.SCHEMATA ")
+                .append(" WHERE SCHEMA_NAME = \'")
+                .append(schemaName)
+                .append("\';")
+                .toString();
+    }
+
+    /**
+     * Build create Greenplum schema SQL String
+     *
+     * @param schemaName schema name
+     * @param user user name
+     * @return SQL String
+     */
+    public static String buildCreateSchema(final String schemaName, final String user) {
+        return new StringBuilder()
+                .append(" CREATE SCHEMA \"")
+                .append(schemaName)
+                .append("\" AUTHORIZATION \"")
+                .append(user)
+                .append("\";")
+                .toString();
+    }
+
+    /**
+     * Build SQL to check whether the column exists.
+     *
+     * @param schemaName Greenplum table name
+     * @param columnName Greenplum column name
+     * @return the check column SQL string
+     */
+    public static String getCheckColumn(final String schemaName, final String tableName, final String columnName) {
+        final StringBuilder sqlBuilder = new StringBuilder()
+                .append("SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS  WHERE TABLE_SCHEMA = '")
+                .append(schemaName)
+                .append("' AND TABLE_NAME = '")
+                .append(tableName)
+                .append("' AND COLUMN_NAME = '")
+                .append(columnName)
+                .append("' ;");
+        LOGGER.info("check table sql: {}", sqlBuilder);
+        return sqlBuilder.toString();
+    }
+
+    /**
+     * Build create table SQL by GreenplumTableInfo.
+     *
+     * @param table Greenplum table info {@link GreenplumTableInfo}
+     * @return the create table SQL String
+     */
+    public static List<String> buildCreateTableSql(final GreenplumTableInfo table) {
+        final List<String> sqls = Lists.newArrayList();
+        final StringBuilder createSql = new StringBuilder()
+                .append("CREATE TABLE ").append(table.getSchemaName())
+                .append(".\"")
+                .append(table.getTableName())
+                .append("\"")
+                .append(buildCreateColumnsSql(table));
+        sqls.add(createSql.toString());
+
+        // column comments
+        sqls.addAll(getColumnsComment(table.getSchemaName(), table.getTableName(), table.getColumns()));
+        // table comment
+        if (StringUtils.isNotEmpty(table.getComment())) {
+            sqls.add(getTableComment(table));
+        }
+        LOGGER.info("create table sql : {}", sqls);
+        return sqls;
+    }
+
+    /**
+     * Build add columns SQL.
+     *
+     * @param schemaName Greenplum schema name
+     * @param tableName Greenplum table name
+     * @param columnList Greenplum column list {@link List}
+     * @return add column SQL string list
+     */
+    public static List<String> buildAddColumnsSql(final String schemaName, final String tableName,
+            List<GreenplumColumnInfo> columnList) {
+        final List<String> resultList = Lists.newArrayList();
+        final StringBuilder sqlBuilder = new StringBuilder();
+
+        columnList.forEach(columnInfo -> {
+            sqlBuilder.append("ALTER TABLE \"")
+                    .append(schemaName)
+                    .append("\".\"")
+                    .append(tableName)
+                    .append("\" ADD \"")
+                    .append(columnInfo.getName())
+                    .append("\" ")
+                    .append(columnInfo.getType())
+                    .append(" ");
+            resultList.add(sqlBuilder.toString());
+            sqlBuilder.delete(0, sqlBuilder.length());
+        });
+        resultList.addAll(getColumnsComment(schemaName, tableName, columnList));
+        LOGGER.info("add columns sql={}", resultList);
+        return resultList;
+    }
+
+    /**
+     * Build create column SQL.
+     *
+     * @param table Greenplum table info {@link GreenplumTableInfo}
+     * @return create column SQL string
+     */
+    private static String buildCreateColumnsSql(final GreenplumTableInfo table) {
+        final List<String> columnList = getColumnsInfo(table.getColumns());
+        final StringBuilder sql = new StringBuilder()
+                .append(" (")
+                .append(StringUtils.join(columnList, ","));
+        if (!StringUtils.isEmpty(table.getPrimaryKey())) {
+            sql.append(", PRIMARY KEY (")
+                    .append(table.getPrimaryKey())
+                    .append(")");
+        }
+        sql.append(") ");
+        return sql.toString();
+    }
+
+    /**
+     * Build column info by GreenplumColumnInfo list.
+     *
+     * @param columns Greenplum column info {@link GreenplumColumnInfo} list
+     * @return the SQL list
+     */
+    private static List<String> getColumnsInfo(final List<GreenplumColumnInfo> columns) {
+        final List<String> columnList = new ArrayList<>();
+        final StringBuilder columnBuilder = new StringBuilder();
+
+        columns.forEach(columnInfo -> {
+            columnBuilder.append("\"")
+                    .append(columnInfo.getName())
+                    .append("\" ")
+                    .append(columnInfo.getType());
+            columnList.add(columnBuilder.toString());
+            columnBuilder.delete(0, columnBuilder.length());
+        });
+        return columnList;
+    }
+
+    /**
+     * Build columns comment SQLs
+     *
+     * @param tableName Greenplum table name
+     * @param columns Greenplum colum list {@link GreenplumColumnInfo}
+     * @return the SQL String list
+     */
+    private static List<String> getColumnsComment(final String schemaName, final String tableName,
+            List<GreenplumColumnInfo> columns) {
+        final List<String> commentList = new ArrayList<>();
+        for (GreenplumColumnInfo columnInfo : columns) {
+            if (StringUtils.isNoneBlank(columnInfo.getComment())) {
+                StringBuilder commSql = new StringBuilder();
+                commSql.append("COMMENT ON COLUMN \"")
+                        .append(schemaName)
+                        .append("\".\"")
+                        .append(tableName)
+                        .append("\".\"")
+                        .append(columnInfo.getName())
+                        .append("\" IS \'")
+                        .append(columnInfo.getComment())
+                        .append("\' ;");
+                commentList.add(commSql.toString());
+            }
+        }
+        return commentList;
+    }
+
+    /**
+     * Build table comment SQL
+     *
+     * @param tableInfo Greenplum table info {@link GreenplumTableInfo}
+     * @return the SQL String
+     */
+    private static String getTableComment(final GreenplumTableInfo tableInfo) {
+        return new StringBuilder()
+                .append("COMMENT ON TABLE \"")
+                .append(tableInfo.getSchemaName())
+                .append("\".\"")
+                .append(tableInfo.getTableName())
+                .append("\" IS \'")
+                .append(tableInfo.getComment())
+                .append("\';")
+                .toString();
+    }
+
+    /**
+     * Build query table's all cloumn SQL.
+     *
+     * @param schemaName Greenplum schema name
+     * @param tableName Greenplum table name
+     * @return desc table SQL string
+     */
+    public static String buildDescTableSql(final String schemaName, final String tableName) {
+        StringBuilder sql = new StringBuilder().append(
+                        "SELECT A.COLUMN_NAME,A.UDT_NAME,C.DESCRIPTION FROM INFORMATION_SCHEMA.COLUMNS A")
+                .append(" LEFT JOIN   (SELECT PC.OID AS OOID,PN.NSPNAME,PC.RELNAME")
+                .append(" FROM PG_CLASS PC LEFT OUTER JOIN PG_NAMESPACE PN ON PC.RELNAMESPACE = PN.OID ")
+                .append(" WHERE PN.NSPNAME ='")
+                .append(schemaName)
+                .append("' AND PC.RELNAME = '")
+                .append(tableName)
+                .append("') B   ON A.TABLE_SCHEMA = B.NSPNAME AND A.TABLE_NAME = B.RELNAME")
+                .append(" LEFT JOIN PG_CATALOG.PG_DESCRIPTION C ")
+                .append("ON B.OOID = C.OBJOID AND A.ORDINAL_POSITION = C.OBJSUBID")
+                .append(" WHERE A.TABLE_SCHEMA = '")
+                .append(schemaName)
+                .append("' AND A.TABLE_NAME = '")
+                .append(tableName)
+                .append("'  ORDER BY  C.OBJSUBID ;");
+        LOGGER.info("desc table sql={}", sql);
+        return sql.toString();
+    }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java
index 50bcc9de4..6d63484e7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java
@@ -17,19 +17,25 @@
 
 package org.apache.inlong.manager.service.sink;
 
+import com.google.common.collect.Lists;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumColumnInfo;
 import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSink;
 import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSinkRequest;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumTableInfo;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.resource.sink.greenplum.GreenplumJdbcUtils;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.sql.Connection;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -109,4 +115,65 @@ public class GreenplumSinkServiceTest extends ServiceBaseTest {
         deleteSink(sinkId);
     }
 
+
+    /**
+     * Just using in local test.
+     */
+    @Disabled
+    public void testDbResource() {
+        final String url = "jdbc:postgresql://127.0.0.1:5432/testdb";
+        final String username = "test";
+        final String password = "123456";
+        final String tableName = "test02";
+        final String schemaName = "public";
+
+        try (Connection connection = GreenplumJdbcUtils.getConnection(url, username, password)) {
+            GreenplumTableInfo tableInfo = bulidTestGreenplumTableInfo(username, schemaName, tableName);
+            GreenplumJdbcUtils.createTable(connection, tableInfo);
+            List<GreenplumColumnInfo> addColumns = buildAddColumns();
+            GreenplumJdbcUtils.addColumns(connection, schemaName, tableName, addColumns);
+            List<GreenplumColumnInfo> columns = GreenplumJdbcUtils.getColumns(connection, schemaName, tableName);
+            Assertions.assertEquals(columns.size(), tableInfo.getColumns().size() + addColumns.size());
+        } catch (Exception e) {
+            // print to local console
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Build add Greenplum column info.
+     *
+     * @return {@link List}
+     */
+    private List<GreenplumColumnInfo> buildAddColumns() {
+        List<GreenplumColumnInfo> addColums = Lists.newArrayList(
+                new GreenplumColumnInfo("test1", "int", "test1"),
+                new GreenplumColumnInfo("test2", "varchar(30)", "test2"),
+                new GreenplumColumnInfo("Test1", "varchar(50)", "Test1")
+        );
+        return addColums;
+    }
+
+    /**
+     * Build test Greenplum table info.
+     *
+     * @param userName Greenplum database name
+     * @param tableName Greenplum table name
+     * @return {@link GreenplumTableInfo}
+     */
+    private GreenplumTableInfo bulidTestGreenplumTableInfo(final String userName, final String schemaName,
+            final String tableName) {
+        List<GreenplumColumnInfo> columns = Lists.newArrayList(
+                new GreenplumColumnInfo("id", "int", "id"),
+                new GreenplumColumnInfo("cell", "varchar(25)", "cell"),
+                new GreenplumColumnInfo("name", "varchar(50)", "name")
+        );
+        final GreenplumTableInfo tableInfo = new GreenplumTableInfo();
+        tableInfo.setColumns(columns);
+        tableInfo.setTableName(tableName);
+        tableInfo.setPrimaryKey("id");
+        tableInfo.setSchemaName(schemaName);
+        tableInfo.setComment(tableName);
+        return tableInfo;
+    }
 }