You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/02 08:18:58 UTC
[inlong] branch master updated: [INLONG-5084][Manager] Support create Greenplum databases and tables (#5317)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c4663e90c [INLONG-5084][Manager] Support create Greenplum databases and tables (#5317)
c4663e90c is described below
commit c4663e90c4d47e10412a892789b18c0e88f6b62e
Author: haibo.duan <dh...@live.cn>
AuthorDate: Tue Aug 2 16:18:54 2022 +0800
[INLONG-5084][Manager] Support create Greenplum databases and tables (#5317)
---
.../pojo/sink/greenplum/GreenplumColumnInfo.java | 37 +++
.../pojo/sink/greenplum/GreenplumSinkDTO.java | 18 ++
.../pojo/sink/greenplum/GreenplumTableInfo.java | 41 +++
.../sink/greenplum/GreenplumJdbcUtils.java | 271 ++++++++++++++++++++
.../sink/greenplum/GreenplumResourceOperator.java | 117 +++++++++
.../sink/greenplum/GreenplumSqlBuilder.java | 276 +++++++++++++++++++++
.../service/sink/GreenplumSinkServiceTest.java | 67 +++++
7 files changed, 827 insertions(+)
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumColumnInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumColumnInfo.java
new file mode 100644
index 000000000..24d18124d
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumColumnInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.greenplum;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Greenplum column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class GreenplumColumnInfo {
+
+ private String name;
+
+ private String type;
+
+ private String comment;
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
index 7b073154c..89571e84b 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumSinkDTO.java
@@ -28,6 +28,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import javax.validation.constraints.NotNull;
+import java.util.List;
import java.util.Map;
/**
@@ -85,4 +86,21 @@ public class GreenplumSinkDTO {
}
}
+ /**
+ * Get Greenplum table info
+ *
+ * @param greenplumSink Greenplum sink dto,{@link GreenplumSinkDTO}
+ * @param columnList Greenplum column info list,{@link GreenplumColumnInfo}
+ * @return {@link GreenplumTableInfo}
+ */
+ public static GreenplumTableInfo getTableInfo(GreenplumSinkDTO greenplumSink,
+ List<GreenplumColumnInfo> columnList) {
+ GreenplumTableInfo tableInfo = new GreenplumTableInfo();
+ tableInfo.setTableName(greenplumSink.getTableName());
+ tableInfo.setPrimaryKey(greenplumSink.getPrimaryKey());
+ tableInfo.setUserName(greenplumSink.getUsername());
+ tableInfo.setColumns(columnList);
+ return tableInfo;
+ }
}
+
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumTableInfo.java
new file mode 100644
index 000000000..a24148a19
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/greenplum/GreenplumTableInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.greenplum;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * Greenplum table info.
+ */
+@Data
+public class GreenplumTableInfo {
+
+ private String tableName;
+
+ private String comment;
+
+ private String primaryKey;
+
+ private String schemaName;
+
+ private String userName;
+
+ private List<GreenplumColumnInfo> columns;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumJdbcUtils.java
new file mode 100644
index 000000000..8ef5d581a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumJdbcUtils.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.greenplum;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumColumnInfo;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Utils for Greenplum JDBC.
+ */
+public class GreenplumJdbcUtils {
+
+ private static final String GREENPLUM_JDBC_PREFIX = "jdbc:postgresql";
+
+ private static final String GREENPLUM_DRIVER_CLASS = "org.postgresql.Driver";
+
+ public static final String GREENPLUM_DEFAULT_SCHEMA = "public";
+
+ private static final Logger LOG = LoggerFactory.getLogger(GreenplumJdbcUtils.class);
+
+ /**
+ * Get Greenplum connection from the url and user
+ */
+ public static Connection getConnection(final String url, final String user, final String password)
+ throws Exception {
+ if (StringUtils.isBlank(url) || !url.startsWith(GREENPLUM_JDBC_PREFIX)) {
+ throw new Exception("Greenplum server URL was invalid, it should start with jdbc:postgresql");
+ }
+ final Connection conn;
+ try {
+ Class.forName(GREENPLUM_DRIVER_CLASS);
+ conn = DriverManager.getConnection(url, user, password);
+ } catch (Exception e) {
+ final String errorMsg = "get Greenplum connection error, please check postgres jdbc url,"
+ + " username or password!";
+ LOG.error(errorMsg, e);
+ throw new Exception(errorMsg + " other error msg: " + e.getMessage());
+ }
+ if (conn == null) {
+ throw new Exception("get Greenplum connection failed, please contact administrator");
+ }
+ LOG.info("get Greenplum connection success, url={}", url);
+ return conn;
+ }
+
+ /**
+ * Execute SQL command on Greenplum.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param sql SQL String to be executed
+ * @throws Exception on execute SQL error
+ */
+ public static void executeSql(final Connection conn, final String sql) throws Exception {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute(sql);
+ LOG.info("execute sql [{}] success", sql);
+ }
+ }
+
+ /**
+ * Execute batch query SQL on Greenplum.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param sqls SQL String to be executed
+ * @throws Exception on get execute SQL batch error
+ */
+ public static void executeSqlBatch(final Connection conn, final List<String> sqls)
+ throws Exception {
+ conn.setAutoCommit(false);
+ try (Statement stmt = conn.createStatement()) {
+ for (String entry : sqls) {
+ stmt.execute(entry);
+ }
+ conn.commit();
+ } finally {
+ conn.setAutoCommit(true);
+ }
+ LOG.info("execute batch sql [{}] success", sqls);
+ }
+
+ /**
+ * Create Greenplum table by GreenplumTableInfo
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param tableInfo Greenplum table info {@link GreenplumTableInfo}
+ * @throws Exception on create table error
+ */
+ public static void createTable(final Connection conn, final GreenplumTableInfo tableInfo)
+ throws Exception {
+ if (checkTablesExist(conn, tableInfo.getSchemaName(), tableInfo.getTableName())) {
+ LOG.info("the table [{}] are exists", tableInfo.getTableName());
+ } else {
+ final List<String> createTableSqls = GreenplumSqlBuilder.buildCreateTableSql(tableInfo);
+ executeSqlBatch(conn, createTableSqls);
+ LOG.info("execute sql [{}] success", createTableSqls);
+ }
+ }
+
+ /**
+ * Create Greenplum schema by schemaNama
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param schemaName Greenplum schema name
+ * @param userName Greenplum user name
+ * @throws Exception on create schema error
+ */
+ public static void createSchema(final Connection conn, final String schemaName, final String userName)
+ throws Exception {
+ if (checkSchemaExist(conn, schemaName)) {
+ LOG.info("the schema [{}] are exists", schemaName);
+ } else {
+ final String sql = GreenplumSqlBuilder.buildCreateSchema(schemaName, userName);
+ executeSql(conn, sql);
+ LOG.info("execute create schema sql [{}] success", sql);
+ }
+ }
+
+ /**
+ * Check tables from the Greenplum information_schema.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param schemaName Greenplum database name
+ * @param tableName Greenplum table name
+ * @return true if table exist, otherwise false
+ * @throws Exception on check table exist error
+ */
+ public static boolean checkTablesExist(final Connection conn, final String schemaName, final String tableName)
+ throws Exception {
+ boolean result = false;
+ final String checkTableSql = GreenplumSqlBuilder.getCheckTable(schemaName, tableName);
+ try (Statement statement = conn.createStatement();
+ ResultSet resultSet = statement.executeQuery(checkTableSql)) {
+ if (null != resultSet && resultSet.next()) {
+ int size = resultSet.getInt(1);
+ if (size > 0) {
+ result = true;
+ }
+ }
+ }
+ LOG.info("check table exist for username={} table={}, result={}", schemaName, tableName, result);
+ return result;
+ }
+
+ /**
+ * Check whether the column exists in the table.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param schemaName Greenplum schema name
+ * @param tableName Greenplum table name
+ * @param column Greenplum table column name
+ * @return true if column exist in the table, otherwise false
+ * @throws Exception on check column exist error
+ */
+ public static boolean checkColumnExist(final Connection conn, final String schemaName, final String tableName,
+ final String column) throws Exception {
+ boolean result = false;
+ final String checkColumnSql = GreenplumSqlBuilder.getCheckColumn(schemaName, tableName, column);
+ try (Statement statement = conn.createStatement();
+ ResultSet resultSet = statement.executeQuery(checkColumnSql)) {
+ if (Objects.nonNull(resultSet) && resultSet.next()) {
+ int count = resultSet.getInt(1);
+ if (count > 0) {
+ result = true;
+ }
+ }
+ }
+ LOG.info("check column exist for table={}, column={}, result={}", tableName, column, result);
+ return result;
+ }
+
+ /**
+ * Check whether the schema exists.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param schemaName Greenplum schema name
+ * @return true if schema exist in the table, otherwise false
+ * @throws Exception on check column exist error
+ */
+ public static boolean checkSchemaExist(final Connection conn, final String schemaName) throws Exception {
+ boolean result = false;
+ if (GREENPLUM_DEFAULT_SCHEMA.equals(schemaName)) {
+ result = true;
+ } else {
+ final String checkColumnSql = GreenplumSqlBuilder.getCheckSchema(schemaName);
+ try (Statement statement = conn.createStatement();
+ ResultSet resultSet = statement.executeQuery(checkColumnSql)) {
+ if (Objects.nonNull(resultSet) && resultSet.next()) {
+ int count = resultSet.getInt(1);
+ if (count > 0) {
+ result = true;
+ }
+ }
+ }
+ }
+ LOG.info("check schema exist for schema={}, result={}", schemaName, result);
+ return result;
+ }
+
+ /**
+ * Query all columns of the tableName.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param schemaName Greenplum schema name
+ * @param tableName Greenplum table name
+ * @return {@link List}
+ * @throws Exception on get columns error
+ */
+ public static List<GreenplumColumnInfo> getColumns(final Connection conn, final String schemaName,
+ final String tableName) throws Exception {
+ final List<GreenplumColumnInfo> columnList = new ArrayList<>();
+ final String querySql = GreenplumSqlBuilder.buildDescTableSql(schemaName, tableName);
+
+ try (Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(querySql)) {
+ while (rs.next()) {
+ columnList.add(new GreenplumColumnInfo(rs.getString(1), rs.getString(2),
+ rs.getString(3)));
+ }
+ }
+ return columnList;
+ }
+
+ /**
+ * Add columns for Greenpluum table.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param schemaName Greenpluum schema name
+ * @param tableName Greenpluum table name
+ * @param columns Greenpluum columns to be added
+ * @throws Exception on add columns error
+ */
+ public static void addColumns(final Connection conn, final String schemaName, final String tableName,
+ final List<GreenplumColumnInfo> columns) throws Exception {
+ final List<GreenplumColumnInfo> columnInfos = new ArrayList<>();
+
+ for (GreenplumColumnInfo columnInfo : columns) {
+ if (!checkColumnExist(conn, schemaName, tableName, columnInfo.getName())) {
+ columnInfos.add(columnInfo);
+ }
+ }
+ final List<String> addColumnSql = GreenplumSqlBuilder.buildAddColumnsSql(schemaName, tableName, columnInfos);
+ executeSqlBatch(conn, addColumnSql);
+ LOG.info("execute add columns sql [{}] success", addColumnSql);
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumResourceOperator.java
new file mode 100644
index 000000000..a621b3a03
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumResourceOperator.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.greenplum;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumColumnInfo;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSinkDTO;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumTableInfo;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.sql.Connection;
+import java.util.List;
+
+public class GreenplumResourceOperator implements SinkResourceOperator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GreenplumResourceOperator.class);
+
+ public static final String GREENPLUM_DEFAULT_SCHEMA = "public";
+
+ @Autowired
+ private StreamSinkService sinkService;
+
+ @Autowired
+ private StreamSinkFieldEntityMapper fieldEntityMapper;
+
+ @Override
+ public Boolean accept(String sinkType) {
+ return SinkType.GREENPLUM.equals(sinkType);
+ }
+
+ @Override
+ public void createSinkResource(SinkInfo sinkInfo) {
+ LOG.info("begin to create Greenplum resources sinkId={}", sinkInfo.getId());
+ if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+ LOG.warn("Greenplum resource [" + sinkInfo.getId() + "] already success, skip to create");
+ return;
+ } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+ LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
+ return;
+ }
+ this.createTable(sinkInfo);
+ }
+
+ /**
+ * Create Greenplum table by SinkInfo.
+ *
+ * @param sinkInfo {@link SinkInfo}
+ */
+ private void createTable(SinkInfo sinkInfo) {
+ LOG.info("begin to create Greenplum table for sinkId={}", sinkInfo.getId());
+ List<StreamSinkFieldEntity> fieldList = fieldEntityMapper.selectBySinkId(sinkInfo.getId());
+ if (CollectionUtils.isEmpty(fieldList)) {
+ LOG.warn("no Greenplum fields found, skip to create table for sinkId={}", sinkInfo.getId());
+ }
+ // set columns
+ final List<GreenplumColumnInfo> columnList = Lists.newArrayList();
+ fieldList.forEach(field -> {
+ columnList.add(
+ new GreenplumColumnInfo(field.getFieldName(), field.getFieldType(), field.getFieldComment())
+ );
+ });
+
+ GreenplumSinkDTO greenplumSink = GreenplumSinkDTO.getFromJson(sinkInfo.getExtParams());
+ GreenplumTableInfo tableInfo = GreenplumSinkDTO.getTableInfo(greenplumSink, columnList);
+ if (StringUtils.isEmpty(tableInfo.getSchemaName())) {
+ tableInfo.setSchemaName(GREENPLUM_DEFAULT_SCHEMA);
+ }
+ try (Connection conn = GreenplumJdbcUtils.getConnection(greenplumSink.getJdbcUrl(), greenplumSink.getUsername(),
+ greenplumSink.getPassword())) {
+ // 1.If schema not exists,create it
+ GreenplumJdbcUtils.createSchema(conn, tableInfo.getTableName(), tableInfo.getUserName());
+ // 2.If table not exists, create it
+ GreenplumJdbcUtils.createTable(conn, tableInfo);
+ // 3.Table exists, add columns - skip the exists columns
+ GreenplumJdbcUtils.addColumns(conn, tableInfo.getSchemaName(), tableInfo.getTableName(), columnList);
+ // 4.Update the sink status to success
+ final String info = "success to create Greenplum resource";
+ sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+ LOG.info(info + " for sinkInfo={}", sinkInfo);
+ // 4. close connection.
+ } catch (Throwable e) {
+ String errMsg = "create Greenplum table failed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+ throw new WorkflowException(errMsg);
+ }
+ LOG.info("success create Greenplum table for data sink [" + sinkInfo.getId() + "]");
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java
new file mode 100644
index 000000000..257397b57
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/greenplum/GreenplumSqlBuilder.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.greenplum;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumColumnInfo;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class GreenplumSqlBuilder {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GreenplumSqlBuilder.class);
+
+ /**
+ * Build SQL to check whether the table exists.
+ *
+ * @param schemaName Greenplum schema name
+ * @param tableName Greenplum table name
+ * @return the check table SQL string
+ */
+ public static String getCheckTable(final String schemaName, final String tableName) {
+ final StringBuilder sqlBuilder = new StringBuilder()
+ .append("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '")
+ .append(schemaName)
+ .append("' AND TABLE_TYPE = 'BASE TABLE' ")
+ .append(" AND TABLE_NAME = '")
+ .append(tableName)
+ .append("' ;");
+ LOGGER.info("check table sql: {}", sqlBuilder);
+ return sqlBuilder.toString();
+ }
+
+ /**
+ * Build SQL to check whether the schema exists.
+ *
+ * @param schemaName
+ * @return
+ */
+ public static String getCheckSchema(final String schemaName) {
+ return new StringBuilder()
+ .append("SELECT COUNT(1) FROM INFORMATION_SCHEMA.SCHEMATA ")
+ .append(" WHERE SCHEMA_NAME = \'")
+ .append(schemaName)
+ .append("\';")
+ .toString();
+ }
+
+ /**
+ * Build create Greenplum schema SQL String
+ *
+ * @param schemaName schema name
+ * @param user user name
+ * @return SQL String
+ */
+ public static String buildCreateSchema(final String schemaName, final String user) {
+ return new StringBuilder()
+ .append(" CREATE SCHEMA \"")
+ .append(schemaName)
+ .append("\" AUTHORIZATION \"")
+ .append(user)
+ .append("\";")
+ .toString();
+ }
+
+ /**
+ * Build SQL to check whether the column exists.
+ *
+ * @param schemaName Greenplum table name
+ * @param columnName Greenplum column name
+ * @return the check column SQL string
+ */
+ public static String getCheckColumn(final String schemaName, final String tableName, final String columnName) {
+ final StringBuilder sqlBuilder = new StringBuilder()
+ .append("SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '")
+ .append(schemaName)
+ .append("' AND TABLE_NAME = '")
+ .append(tableName)
+ .append("' AND COLUMN_NAME = '")
+ .append(columnName)
+ .append("' ;");
+ LOGGER.info("check table sql: {}", sqlBuilder);
+ return sqlBuilder.toString();
+ }
+
+ /**
+ * Build create table SQL by GreenplumTableInfo.
+ *
+ * @param table Greenplum table info {@link GreenplumTableInfo}
+ * @return the create table SQL String
+ */
+ public static List<String> buildCreateTableSql(final GreenplumTableInfo table) {
+ final List<String> sqls = Lists.newArrayList();
+ final StringBuilder createSql = new StringBuilder()
+ .append("CREATE TABLE ").append(table.getSchemaName())
+ .append(".\"")
+ .append(table.getTableName())
+ .append("\"")
+ .append(buildCreateColumnsSql(table));
+ sqls.add(createSql.toString());
+
+ // column comments
+ sqls.addAll(getColumnsComment(table.getSchemaName(), table.getTableName(), table.getColumns()));
+ // table comment
+ if (StringUtils.isNotEmpty(table.getComment())) {
+ sqls.add(getTableComment(table));
+ }
+ LOGGER.info("create table sql : {}", sqls);
+ return sqls;
+ }
+
+ /**
+ * Build add columns SQL.
+ *
+ * @param schemaName Greenplum schema name
+ * @param tableName Greenplum table name
+ * @param columnList Greenplum column list {@link List}
+ * @return add column SQL string list
+ */
+ public static List<String> buildAddColumnsSql(final String schemaName, final String tableName,
+ List<GreenplumColumnInfo> columnList) {
+ final List<String> resultList = Lists.newArrayList();
+ final StringBuilder sqlBuilder = new StringBuilder();
+
+ columnList.forEach(columnInfo -> {
+ sqlBuilder.append("ALTER TABLE \"")
+ .append(schemaName)
+ .append("\".\"")
+ .append(tableName)
+ .append("\" ADD \"")
+ .append(columnInfo.getName())
+ .append("\" ")
+ .append(columnInfo.getType())
+ .append(" ");
+ resultList.add(sqlBuilder.toString());
+ sqlBuilder.delete(0, sqlBuilder.length());
+ });
+ resultList.addAll(getColumnsComment(schemaName, tableName, columnList));
+ LOGGER.info("add columns sql={}", resultList);
+ return resultList;
+ }
+
+ /**
+ * Build create column SQL.
+ *
+ * @param table Greenplum table info {@link GreenplumTableInfo}
+ * @return create column SQL string
+ */
+ private static String buildCreateColumnsSql(final GreenplumTableInfo table) {
+ final List<String> columnList = getColumnsInfo(table.getColumns());
+ final StringBuilder sql = new StringBuilder()
+ .append(" (")
+ .append(StringUtils.join(columnList, ","));
+ if (!StringUtils.isEmpty(table.getPrimaryKey())) {
+ sql.append(", PRIMARY KEY (")
+ .append(table.getPrimaryKey())
+ .append(")");
+ }
+ sql.append(") ");
+ return sql.toString();
+ }
+
+ /**
+ * Build column info by GreenplumColumnInfo list.
+ *
+ * @param columns Greenplum column info {@link GreenplumColumnInfo} list
+ * @return the SQL list
+ */
+ private static List<String> getColumnsInfo(final List<GreenplumColumnInfo> columns) {
+ final List<String> columnList = new ArrayList<>();
+ final StringBuilder columnBuilder = new StringBuilder();
+
+ columns.forEach(columnInfo -> {
+ columnBuilder.append("\"")
+ .append(columnInfo.getName())
+ .append("\" ")
+ .append(columnInfo.getType());
+ columnList.add(columnBuilder.toString());
+ columnBuilder.delete(0, columnBuilder.length());
+ });
+ return columnList;
+ }
+
+ /**
+ * Build columns comment SQLs
+ *
+ * @param tableName Greenplum table name
+ * @param columns Greenplum colum list {@link GreenplumColumnInfo}
+ * @return the SQL String list
+ */
+ private static List<String> getColumnsComment(final String schemaName, final String tableName,
+ List<GreenplumColumnInfo> columns) {
+ final List<String> commentList = new ArrayList<>();
+ for (GreenplumColumnInfo columnInfo : columns) {
+ if (StringUtils.isNoneBlank(columnInfo.getComment())) {
+ StringBuilder commSql = new StringBuilder();
+ commSql.append("COMMENT ON COLUMN \"")
+ .append(schemaName)
+ .append("\".\"")
+ .append(tableName)
+ .append("\".\"")
+ .append(columnInfo.getName())
+ .append("\" IS \'")
+ .append(columnInfo.getComment())
+ .append("\' ;");
+ commentList.add(commSql.toString());
+ }
+ }
+ return commentList;
+ }
+
+ /**
+ * Build table comment SQL
+ *
+ * @param tableInfo Greenplum table info {@link GreenplumTableInfo}
+ * @return the SQL String
+ */
+ private static String getTableComment(final GreenplumTableInfo tableInfo) {
+ return new StringBuilder()
+ .append("COMMENT ON TABLE \"")
+ .append(tableInfo.getSchemaName())
+ .append("\".\"")
+ .append(tableInfo.getTableName())
+ .append("\" IS \'")
+ .append(tableInfo.getComment())
+ .append("\';")
+ .toString();
+ }
+
+ /**
+ * Build query table's all cloumn SQL.
+ *
+ * @param schemaName Greenplum schema name
+ * @param tableName Greenplum table name
+ * @return desc table SQL string
+ */
+ public static String buildDescTableSql(final String schemaName, final String tableName) {
+ StringBuilder sql = new StringBuilder().append(
+ "SELECT A.COLUMN_NAME,A.UDT_NAME,C.DESCRIPTION FROM INFORMATION_SCHEMA.COLUMNS A")
+ .append(" LEFT JOIN (SELECT PC.OID AS OOID,PN.NSPNAME,PC.RELNAME")
+ .append(" FROM PG_CLASS PC LEFT OUTER JOIN PG_NAMESPACE PN ON PC.RELNAMESPACE = PN.OID ")
+ .append(" WHERE PN.NSPNAME ='")
+ .append(schemaName)
+ .append("' AND PC.RELNAME = '")
+ .append(tableName)
+ .append("') B ON A.TABLE_SCHEMA = B.NSPNAME AND A.TABLE_NAME = B.RELNAME")
+ .append(" LEFT JOIN PG_CATALOG.PG_DESCRIPTION C ")
+ .append("ON B.OOID = C.OBJOID AND A.ORDINAL_POSITION = C.OBJSUBID")
+ .append(" WHERE A.TABLE_SCHEMA = '")
+ .append(schemaName)
+ .append("' AND A.TABLE_NAME = '")
+ .append(tableName)
+ .append("' ORDER BY C.OBJSUBID ;");
+ LOGGER.info("desc table sql={}", sql);
+ return sql.toString();
+ }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java
index 50bcc9de4..6d63484e7 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java
@@ -17,19 +17,25 @@
package org.apache.inlong.manager.service.sink;
+import com.google.common.collect.Lists;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumColumnInfo;
import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSink;
import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumSinkRequest;
+import org.apache.inlong.manager.pojo.sink.greenplum.GreenplumTableInfo;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.resource.sink.greenplum.GreenplumJdbcUtils;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
@@ -109,4 +115,65 @@ public class GreenplumSinkServiceTest extends ServiceBaseTest {
deleteSink(sinkId);
}
+
+ /**
+ * Just using in local test.
+ */
+ @Disabled
+ public void testDbResource() {
+ final String url = "jdbc:postgresql://127.0.0.1:5432/testdb";
+ final String username = "test";
+ final String password = "123456";
+ final String tableName = "test02";
+ final String schemaName = "public";
+
+ try (Connection connection = GreenplumJdbcUtils.getConnection(url, username, password)) {
+ GreenplumTableInfo tableInfo = bulidTestGreenplumTableInfo(username, schemaName, tableName);
+ GreenplumJdbcUtils.createTable(connection, tableInfo);
+ List<GreenplumColumnInfo> addColumns = buildAddColumns();
+ GreenplumJdbcUtils.addColumns(connection, schemaName, tableName, addColumns);
+ List<GreenplumColumnInfo> columns = GreenplumJdbcUtils.getColumns(connection, schemaName, tableName);
+ Assertions.assertEquals(columns.size(), tableInfo.getColumns().size() + addColumns.size());
+ } catch (Exception e) {
+ // print to local console
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Build add Greenplum column info.
+ *
+ * @return {@link List}
+ */
+ private List<GreenplumColumnInfo> buildAddColumns() {
+ List<GreenplumColumnInfo> addColums = Lists.newArrayList(
+ new GreenplumColumnInfo("test1", "int", "test1"),
+ new GreenplumColumnInfo("test2", "varchar(30)", "test2"),
+ new GreenplumColumnInfo("Test1", "varchar(50)", "Test1")
+ );
+ return addColums;
+ }
+
+ /**
+ * Build test Greenplum table info.
+ *
+ * @param userName Greenplum database name
+ * @param tableName Greenplum table name
+ * @return {@link GreenplumTableInfo}
+ */
+ private GreenplumTableInfo bulidTestGreenplumTableInfo(final String userName, final String schemaName,
+ final String tableName) {
+ List<GreenplumColumnInfo> columns = Lists.newArrayList(
+ new GreenplumColumnInfo("id", "int", "id"),
+ new GreenplumColumnInfo("cell", "varchar(25)", "cell"),
+ new GreenplumColumnInfo("name", "varchar(50)", "name")
+ );
+ final GreenplumTableInfo tableInfo = new GreenplumTableInfo();
+ tableInfo.setColumns(columns);
+ tableInfo.setTableName(tableName);
+ tableInfo.setPrimaryKey("id");
+ tableInfo.setSchemaName(schemaName);
+ tableInfo.setComment(tableName);
+ return tableInfo;
+ }
}