You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/25 10:38:23 UTC

[inlong] branch master updated: [INLONG-5086][Manager] Support create Oracle databases and tables (#5187)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c8f5bb70 [INLONG-5086][Manager] Support create Oracle databases and tables (#5187)
6c8f5bb70 is described below

commit 6c8f5bb7050250c4ce7d2d4ed0cef3a7a7de2e1c
Author: haibo.duan <dh...@live.cn>
AuthorDate: Mon Jul 25 18:38:19 2022 +0800

    [INLONG-5086][Manager] Support create Oracle databases and tables (#5187)
---
 .../common/pojo/sink/oracle/OracleColumnInfo.java  |  37 ++++
 .../common/pojo/sink/oracle/OracleSinkDTO.java     |  23 +-
 .../common/pojo/sink/oracle/OracleTableInfo.java   |  39 ++++
 .../service/resource/oracle/OracleJdbcUtils.java   | 234 +++++++++++++++++++++
 .../resource/oracle/OracleResourceOperator.java    | 130 ++++++++++++
 .../service/resource/oracle/OracleSqlBuilder.java  | 191 +++++++++++++++++
 .../service/core/sink/OracleSinkServiceTest.java   |  83 +++++++-
 7 files changed, 733 insertions(+), 4 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleColumnInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleColumnInfo.java
new file mode 100644
index 000000000..da1b634b5
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleColumnInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.oracle;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Oracle column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class OracleColumnInfo {
+
+    private String name;
+
+    private String type;
+
+    private String comment;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
index 94dd47925..6e7268efb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
@@ -26,11 +26,11 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.sink.mysql.MySQLSinkDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.validation.constraints.NotNull;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -43,9 +43,10 @@ import java.util.Map;
 public class OracleSinkDTO {
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    private static final Logger LOGGER = LoggerFactory.getLogger(MySQLSinkDTO.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(OracleSinkDTO.class);
 
-    @ApiModelProperty("Oracle JDBC URL, such as jdbc:oracle:thin://host:port/database")
+    @ApiModelProperty("Oracle JDBC URL,Such as jdbc:oracle:thin@host:port:sid "
+            + "or jdbc:oracle:thin@host:port/service_name")
     private String jdbcUrl;
 
     @ApiModelProperty("Username for JDBC URL")
@@ -89,4 +90,20 @@ public class OracleSinkDTO {
             throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
         }
     }
+
+    /**
+     * Get Oracle table info
+     *
+     * @param oracleSink Oracle sink dto,{@link OracleSinkDTO}
+     * @param columnList Oracle column info list,{@link OracleColumnInfo}
+     * @return {@link OracleTableInfo}
+     */
+    public static OracleTableInfo getTableInfo(OracleSinkDTO oracleSink, List<OracleColumnInfo> columnList) {
+        OracleTableInfo tableInfo = new OracleTableInfo();
+        tableInfo.setTableName(oracleSink.getTableName());
+        tableInfo.setPrimaryKey(oracleSink.getPrimaryKey());
+        tableInfo.setUserName(oracleSink.getUsername());
+        tableInfo.setColumns(columnList);
+        return tableInfo;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleTableInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleTableInfo.java
new file mode 100644
index 000000000..c6f2bd120
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleTableInfo.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.oracle;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * Oracle table info.
+ */
+@Data
+public class OracleTableInfo {
+
+    private String tableName;
+
+    private String comment;
+
+    private String primaryKey;
+
+    private String userName;
+
+    private List<OracleColumnInfo> columns;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleJdbcUtils.java
new file mode 100644
index 000000000..0127ad479
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleJdbcUtils.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.oracle;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utils for Oracle JDBC.
+ */
+public class OracleJdbcUtils {
+
+    private static final String ORACLE_JDBC_PREFIX = "jdbc:oracle";
+
+    private static final String ORACLE_DRIVER_CLASS = "oracle.jdbc.driver.OracleDriver";
+
+    private static final Logger LOG = LoggerFactory.getLogger(OracleJdbcUtils.class);
+
+    /**
+     * Get Oracle connection from the url and user.
+     *
+     * @param url jdbc url,such as jdbc:oracle:thin@host:port:sid or jdbc:oracle:thin@host:port/service_name
+     * @param user Username for JDBC URL
+     * @param password User password
+     * @return {@link Connection}
+     * @throws Exception on get connection error
+     */
+    public static Connection getConnection(String url, String user, String password)
+            throws Exception {
+        if (StringUtils.isBlank(url) || !url.startsWith(ORACLE_JDBC_PREFIX)) {
+            throw new Exception("Oracle server URL was invalid, it should start with jdbc:oracle");
+        }
+        Connection conn;
+        try {
+            Class.forName(ORACLE_DRIVER_CLASS);
+            conn = DriverManager.getConnection(url, user, password);
+        } catch (Exception e) {
+            String errorMsg = "get Oracle connection error, please check Oracle JDBC url, username or password!";
+            LOG.error(errorMsg, e);
+            throw new Exception(errorMsg + " other error msg: " + e.getMessage());
+        }
+        if (conn == null) {
+            throw new Exception("get Oracle connection failed, please contact administrator.");
+        }
+        LOG.info("get Oracle connection success, url={}", url);
+        return conn;
+    }
+
+    /**
+     * Execute SQL command on Oracle.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param sql SQL string to be executed
+     * @throws Exception on execute SQL error
+     */
+    public static void executeSql(Connection conn, String sql) throws Exception {
+        Statement stmt = conn.createStatement();
+        LOG.info("execute sql [{}] success !", sql);
+        stmt.execute(sql);
+        stmt.close();
+    }
+
+    /**
+     * Execute query SQL on Oracle.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param sql SQL string to be executed
+     * @return {@link ResultSet}
+     * @throws Exception on execute query SQL error
+     */
+    public static ResultSet executeQuerySql(Connection conn, String sql)
+            throws Exception {
+        Statement stmt = conn.createStatement();
+        ResultSet resultSet = stmt.executeQuery(sql);
+        LOG.info("execute sql [{}] success !", sql);
+        return resultSet;
+    }
+
+    /**
+     * Execute batch query SQL on Oracle.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param sqls SQL string to be executed
+     * @throws Exception on get execute SQL batch error
+     */
+    public static void executeSqlBatch(Connection conn, List<String> sqls)
+            throws Exception {
+        Statement stmt = conn.createStatement();
+        for (String entry : sqls) {
+            stmt.execute(entry);
+        }
+        stmt.close();
+        LOG.info("execute sql [{}] success! ", sqls);
+    }
+
+    /**
+     * Create Oracle table by OracleTableInfo
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param tableInfo Oracle table info  {@link OracleTableInfo}
+     * @throws Exception on create table error
+     */
+    public static void createTable(Connection conn, OracleTableInfo tableInfo)
+            throws Exception {
+        if (checkTablesExist(conn, tableInfo.getUserName(), tableInfo.getTableName())) {
+            LOG.info("The table [{}] are exists !", tableInfo.getTableName());
+        } else {
+            List<String> createTableSqls = OracleSqlBuilder.buildCreateTableSql(tableInfo);
+            executeSqlBatch(conn, createTableSqls);
+            LOG.info("execute sql [{}] success! ", createTableSqls);
+        }
+    }
+
+    /**
+     * Check tables from the Oracle information_schema.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param userName Oracle database name
+     * @param tableName Oracle table name
+     * @return true if table exist, otherwise false
+     * @throws Exception on check table exist error
+     */
+    public static boolean checkTablesExist(Connection conn, String userName, String tableName)
+            throws Exception {
+        boolean result = false;
+        String checkTableSql = OracleSqlBuilder.getCheckTable(userName, tableName);
+        ResultSet resultSet = executeQuerySql(conn, checkTableSql);
+        if (null != resultSet && resultSet.next()) {
+            int size = resultSet.getInt(1);
+            if (size > 0) {
+                LOG.info("check table exist for username={} table={}, result={}", userName, tableName, result);
+                return true;
+            }
+        }
+        resultSet.close();
+        return result;
+    }
+
+    /**
+     * Check whether the column exists in the table.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param tableName Oracle table name
+     * @param column Oracle table column name
+     * @return true if column exist in the table, otherwise false
+     * @throws Exception on check column exist error
+     */
+    public static boolean checkColumnExist(Connection conn, String tableName, String column)
+            throws Exception {
+        boolean result = false;
+        String checkColumnSql = OracleSqlBuilder.getCheckColumn(tableName, column);
+        ResultSet resultSet = executeQuerySql(conn, checkColumnSql);
+        if (resultSet != null && resultSet.next()) {
+            int count = resultSet.getInt(1);
+            if (count > 0) {
+                result = true;
+            }
+        }
+        resultSet.close();
+        LOG.info("check column exist for table={}, column={}, result={} ", tableName, column, result);
+        return result;
+    }
+
+    /**
+     * Query all columns of the tableName.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param tableName Oracle table name
+     * @return {@link List}
+     * @throws Exception on get columns error
+     */
+    public static List<OracleColumnInfo> getColumns(Connection conn, String tableName)
+            throws Exception {
+        String querySql = OracleSqlBuilder.buildDescTableSql(tableName);
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(querySql);
+        List<OracleColumnInfo> columnList = new ArrayList<>();
+        while (rs.next()) {
+            OracleColumnInfo columnInfo = new OracleColumnInfo();
+            columnInfo.setName(rs.getString(1));
+            columnInfo.setType(rs.getString(2));
+            columnInfo.setComment(rs.getString(3));
+            columnList.add(columnInfo);
+        }
+        rs.close();
+        return columnList;
+    }
+
+    /**
+     * Add columns for Oracle table.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param tableName Oracle table name
+     * @param columns Oracle columns to be added
+     * @throws Exception on add columns error
+     */
+    public static void addColumns(Connection conn, String tableName, List<OracleColumnInfo> columns)
+            throws Exception {
+        List<OracleColumnInfo> columnInfos = new ArrayList<>();
+
+        for (OracleColumnInfo columnInfo : columns) {
+            if (!checkColumnExist(conn, tableName, columnInfo.getName())) {
+                columnInfos.add(columnInfo);
+            }
+        }
+        List<String> addColumnSql = OracleSqlBuilder.buildAddColumnsSql(tableName, columnInfos);
+        executeSqlBatch(conn, addColumnSql);
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleResourceOperator.java
new file mode 100644
index 000000000..b4041e6e2
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleResourceOperator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.oracle;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleSinkDTO;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleTableInfo;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.resource.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+
+public class OracleResourceOperator implements SinkResourceOperator {
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(OracleResourceOperator.class);
+
+    @Autowired
+    private StreamSinkService sinkService;
+
+    @Autowired
+    private StreamSinkFieldEntityMapper fieldEntityMapper;
+
+    @Override
+    public Boolean accept(SinkType sinkType) {
+        return SinkType.ORACLE == sinkType;
+    }
+
+    @Override
+    public void createSinkResource(SinkInfo sinkInfo) {
+        LOG.info("begin to create Oracle resources sinkId={}", sinkInfo.getId());
+        if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+            LOG.warn("Oracle resource [" + sinkInfo.getId() + "] already success, skip to create");
+            return;
+        } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+            LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
+            return;
+        }
+        this.createTable(sinkInfo);
+    }
+
+    /**
+     * Create Oracle table by SinkInfo.
+     *
+     * @param sinkInfo {@link SinkInfo}
+     */
+    private void createTable(SinkInfo sinkInfo) {
+        LOG.info("begin to create Oracle table for sinkId={}", sinkInfo.getId());
+        List<StreamSinkFieldEntity> fieldList = fieldEntityMapper.selectBySinkId(sinkInfo.getId());
+        if (CollectionUtils.isEmpty(fieldList)) {
+            LOG.warn("no Oracle fields found, skip to create table for sinkId={}", sinkInfo.getId());
+        }
+        // set columns
+        List<OracleColumnInfo> columnList = new ArrayList<>();
+        for (StreamSinkFieldEntity field : fieldList) {
+            OracleColumnInfo columnInfo = new OracleColumnInfo();
+            columnInfo.setName(field.getFieldName());
+            columnInfo.setType(field.getFieldType());
+            columnInfo.setComment(field.getFieldComment());
+            columnList.add(columnInfo);
+        }
+
+        OracleSinkDTO oracleSink = OracleSinkDTO.getFromJson(sinkInfo.getExtParams());
+        OracleTableInfo tableInfo = OracleSinkDTO.getTableInfo(oracleSink, columnList);
+        String url = oracleSink.getJdbcUrl();
+        String user = oracleSink.getUsername();
+        String password = oracleSink.getPassword();
+        String tableName = tableInfo.getTableName();
+        Connection conn = null;
+        try {
+            conn = OracleJdbcUtils.getConnection(url, user, password);
+
+            // In Oracle, there is no need to consider whether the database exists
+            // 1.If table not exists, create it
+            OracleJdbcUtils.createTable(conn, tableInfo);
+            // 2. table exists, add columns - skip the exists columns
+            OracleJdbcUtils.addColumns(conn, tableName, columnList);
+            // 3. update the sink status to success
+            String info = "success to create Oracle resource";
+            sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+            LOG.info(info + " for sinkInfo={}", sinkInfo);
+            // 4. close connection.
+            conn.close();
+        } catch (Throwable e) {
+            String errMsg = "create Oracle table failed: " + e.getMessage();
+            LOG.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new WorkflowException(errMsg);
+        } finally {
+            try {
+                if (null != conn) {
+                    conn.close();
+                    conn = null;
+                }
+            } catch (Throwable e) {
+                String errMsg = "close Oracle connection failed: " + e.getMessage();
+                throw new WorkflowException(errMsg);
+            }
+        }
+        LOG.info("success create Oracle table for data sink [" + sinkInfo.getId() + "]");
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleSqlBuilder.java
new file mode 100644
index 000000000..dcac4b6bb
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleSqlBuilder.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.oracle;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OracleSqlBuilder {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(OracleSqlBuilder.class);
+
+    /**
+     * Build SQL to check whether the table exists.
+     *
+     * @param userName Oracle database name
+     * @param tableName Oracle table name
+     * @return the check table SQL string
+     */
+    public static String getCheckTable(String userName, String tableName) {
+        StringBuilder sqlBuilder = new StringBuilder();
+        sqlBuilder.append("SELECT COUNT(*) FROM ALL_TABLES WHERE OWNER = UPPER('")
+                .append(userName)
+                .append("') ")
+                .append("AND TABLE_NAME = '")
+                .append(tableName)
+                .append("' ");
+        LOGGER.info("check table sql: {}", sqlBuilder);
+        return sqlBuilder.toString();
+    }
+
+    /**
+     * Build SQL to check whether the column exists.
+     *
+     * @param tableName Oracle table name
+     * @param columnName Oracle column name
+     * @return the check column SQL string
+     */
+    public static String getCheckColumn(String tableName, String columnName) {
+        StringBuilder sqlBuilder = new StringBuilder();
+        sqlBuilder.append("SELECT count(1) ")
+                .append(" from  USER_TAB_COLUMNS where TABLE_NAME= '")
+                .append(tableName)
+                .append("' and COLUMN_NAME = '")
+                .append(columnName)
+                .append("' ");
+        LOGGER.info("check table sql: {}", sqlBuilder);
+        return sqlBuilder.toString();
+    }
+
+    /**
+     * Build create table SQL by OracleTableInfo.
+     *
+     * @param table Oracle table info {@link OracleTableInfo}
+     * @return the create table SQL String
+     */
+    public static List<String> buildCreateTableSql(OracleTableInfo table) {
+        List<String> sqls = new ArrayList<>();
+        StringBuilder createSql = new StringBuilder();
+        // Support _ beginning with underscore
+        createSql.append("CREATE TABLE ").append(table.getUserName())
+                .append(".\"")
+                .append(table.getTableName())
+                .append("\"");
+        // Construct columns and partition columns
+        createSql.append(buildCreateColumnsSql(table));
+        sqls.add(createSql.toString());
+        sqls.addAll(getColumnsComment(table.getTableName(), table.getColumns()));
+        LOGGER.info("create table sql: {}", sqls);
+        return sqls;
+    }
+
+    /**
+     * Build add columns SQL.
+     *
+     * @param tableName Oracle table name
+     * @param columnList Oracle column list {@link List}
+     * @return add column SQL string list
+     */
+    public static List<String> buildAddColumnsSql(String tableName, List<OracleColumnInfo> columnList) {
+        List<String> resultList = new ArrayList<>();
+
+        for (OracleColumnInfo columnInfo : columnList) {
+            StringBuilder sqlBuilder = new StringBuilder();
+            sqlBuilder.append("ALTER TABLE \"")
+                    .append(tableName)
+                    .append("\" ADD \"")
+                    .append(columnInfo.getName())
+                    .append("\" ")
+                    .append(columnInfo.getType())
+                    .append(" ");
+            resultList.add(sqlBuilder.toString());
+        }
+        resultList.addAll(getColumnsComment(tableName, columnList));
+        LOGGER.info("add columns sql={}", resultList);
+        return resultList;
+    }
+
+    /**
+     * Build create column SQL.
+     *
+     * @param table Oracle table info {@link OracleColumnInfo}
+     * @return create column SQL string
+     */
+    private static String buildCreateColumnsSql(OracleTableInfo table) {
+        StringBuilder sql = new StringBuilder();
+        sql.append(" (");
+        List<String> columnList = getColumnsInfo(table.getColumns());
+        sql.append(StringUtils.join(columnList, ","));
+        sql.append(") ");
+        LOGGER.info("create columns sql={}", sql);
+        return sql.toString();
+    }
+
+    /**
+     * Build column info by OracleColumnInfo list.
+     *
+     * @param columns Oracle column info {@link OracleColumnInfo} list
+     * @return the SQL list
+     */
+    private static List<String> getColumnsInfo(List<OracleColumnInfo> columns) {
+        List<String> columnList = new ArrayList<>();
+        for (OracleColumnInfo columnInfo : columns) {
+            // Construct columns and partition columns
+            StringBuilder columnBuilder = new StringBuilder();
+            columnBuilder.append("\"")
+                    .append(columnInfo.getName())
+                    .append("\" ")
+                    .append(columnInfo.getType());
+            columnList.add(columnBuilder.toString());
+        }
+        return columnList;
+    }
+
+    private static List<String> getColumnsComment(String tableName, List<OracleColumnInfo> columns) {
+        List<String> commentList = new ArrayList<>();
+        for (OracleColumnInfo columnInfo : columns) {
+            if (StringUtils.isNoneBlank(columnInfo.getComment())) {
+                StringBuilder commSql = new StringBuilder();
+                commSql.append("COMMENT ON COLUMN \"")
+                        .append(tableName)
+                        .append("\".\"")
+                        .append(columnInfo.getName())
+                        .append("\" IS '")
+                        .append(columnInfo.getComment())
+                        .append("' ");
+                commentList.add(commSql.toString());
+            }
+        }
+        return commentList;
+    }
+
+    /**
+     * Build query table SQL.
+     *
+     * @param tableName Oracle table name
+     * @return desc table SQL string
+     */
+    public static String buildDescTableSql(String tableName) {
+        StringBuilder sql = new StringBuilder();
+        sql.append("SELECT A.COLUMN_NAME,A.DATA_TYPE,B.COMMENTS ")
+                .append(" FROM USER_TAB_COLUMNS A LEFT JOIN USER_COL_COMMENTS B ")
+                .append("  ON A.TABLE_NAME=B.TABLE_NAME AND A.COLUMN_NAME=B.COLUMN_NAME ")
+                .append("WHERE  A.TABLE_NAME = '")
+                .append(tableName)
+                .append("'  ORDER  BY A.COLUMN_ID ");
+        LOGGER.info("desc table sql={}", sql);
+        return sql.toString();
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleSinkServiceTest.java
index dfac64cd9..ea0d32f77 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleSinkServiceTest.java
@@ -21,16 +21,21 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleColumnInfo;
 import org.apache.inlong.manager.common.pojo.sink.oracle.OracleSink;
 import org.apache.inlong.manager.common.pojo.sink.oracle.OracleSinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleTableInfo;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.resource.oracle.OracleJdbcUtils;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.sql.Connection;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -61,7 +66,7 @@ public class OracleSinkServiceTest extends ServiceBaseTest {
         sinkInfo.setInlongStreamId(globalStreamId);
         sinkInfo.setSinkType(SinkType.SINK_ORACLE);
 
-        sinkInfo.setJdbcUrl("jdbc:oracle://host:port/database");
+        sinkInfo.setJdbcUrl("jdbc:oracle:thin@host:port/database");
         sinkInfo.setUsername("oracle");
         sinkInfo.setPassword("inlong");
         sinkInfo.setTableName("user");
@@ -110,4 +115,80 @@ public class OracleSinkServiceTest extends ServiceBaseTest {
         deleteSink(sinkId);
     }
 
+    /**
+     * Just using in local test.
+     */
+    @Disabled
+    public void testDbResource() {
+        String url = "jdbc:oracle:thin:@localhost:1521/ORCLCDB";
+        String username = "c###inlong_test";
+        String password = "123456";
+        String tableName = "test02";
+
+        try {
+            Connection connection = OracleJdbcUtils.getConnection(url, username, password);
+            OracleTableInfo tableInfo = bulidTestOracleTableInfo(username, tableName);
+            OracleJdbcUtils.createTable(connection, tableInfo);
+            List<OracleColumnInfo> addColumns = buildAddColumns();
+            OracleJdbcUtils.addColumns(connection, tableName, addColumns);
+            List<OracleColumnInfo> columns = OracleJdbcUtils.getColumns(connection, tableName);
+            Assertions.assertEquals(columns.size(), tableInfo.getColumns().size() + addColumns.size());
+            connection.close();
+        } catch (Exception e) {
+            // print to local console
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Build add Oracle column info.
+     *
+     * @return {@link List}
+     */
+    private List<OracleColumnInfo> buildAddColumns() {
+        List<OracleColumnInfo> addColum = new ArrayList<>();
+        OracleColumnInfo test1 = new OracleColumnInfo();
+        addColum.add(test1);
+        test1.setName("test1");
+        test1.setType("NUMBER(16)");
+        test1.setComment("test1");
+        OracleColumnInfo test2 = new OracleColumnInfo();
+        addColum.add(test2);
+        test2.setName("test2");
+        test2.setType("VARCHAR2(10)");
+        test2.setComment("test2");
+        return addColum;
+    }
+
+    /**
+     * Build test Oracle table info.
+     *
+     * @param userName Oracle database name
+     * @param tableName Oracle table name
+     * @return {@link OracleTableInfo}
+     */
+    private OracleTableInfo bulidTestOracleTableInfo(String userName, String tableName) {
+        OracleTableInfo oracleTableInfo = new OracleTableInfo();
+        oracleTableInfo.setTableName(tableName);
+        List<OracleColumnInfo> columnInfos = new ArrayList<>();
+        OracleColumnInfo id = new OracleColumnInfo();
+        columnInfos.add(id);
+        id.setName("id");
+        id.setType("NUMBER(6)");
+        id.setComment("id");
+        OracleColumnInfo cell = new OracleColumnInfo();
+        columnInfos.add(cell);
+        cell.setName("cell");
+        cell.setType("VARCHAR2(10)");
+        cell.setComment("cell");
+        OracleColumnInfo name = new OracleColumnInfo();
+        columnInfos.add(name);
+        name.setName("name");
+        name.setType("VARCHAR2(20)");
+        name.setComment("name");
+        oracleTableInfo.setColumns(columnInfos);
+        oracleTableInfo.setUserName(userName);
+        return oracleTableInfo;
+    }
+
 }