You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/25 10:38:23 UTC
[inlong] branch master updated: [INLONG-5086][Manager] Support create Oracle databases and tables (#5187)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6c8f5bb70 [INLONG-5086][Manager] Support create Oracle databases and tables (#5187)
6c8f5bb70 is described below
commit 6c8f5bb7050250c4ce7d2d4ed0cef3a7a7de2e1c
Author: haibo.duan <dh...@live.cn>
AuthorDate: Mon Jul 25 18:38:19 2022 +0800
[INLONG-5086][Manager] Support create Oracle databases and tables (#5187)
---
.../common/pojo/sink/oracle/OracleColumnInfo.java | 37 ++++
.../common/pojo/sink/oracle/OracleSinkDTO.java | 23 +-
.../common/pojo/sink/oracle/OracleTableInfo.java | 39 ++++
.../service/resource/oracle/OracleJdbcUtils.java | 234 +++++++++++++++++++++
.../resource/oracle/OracleResourceOperator.java | 130 ++++++++++++
.../service/resource/oracle/OracleSqlBuilder.java | 191 +++++++++++++++++
.../service/core/sink/OracleSinkServiceTest.java | 83 +++++++-
7 files changed, 733 insertions(+), 4 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleColumnInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleColumnInfo.java
new file mode 100644
index 000000000..da1b634b5
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleColumnInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.oracle;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Oracle column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class OracleColumnInfo {
+
+ private String name;
+
+ private String type;
+
+ private String comment;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
index 94dd47925..6e7268efb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleSinkDTO.java
@@ -26,11 +26,11 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.sink.mysql.MySQLSinkDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.validation.constraints.NotNull;
+import java.util.List;
import java.util.Map;
/**
@@ -43,9 +43,10 @@ import java.util.Map;
public class OracleSinkDTO {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final Logger LOGGER = LoggerFactory.getLogger(MySQLSinkDTO.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(OracleSinkDTO.class);
- @ApiModelProperty("Oracle JDBC URL, such as jdbc:oracle:thin://host:port/database")
+ @ApiModelProperty("Oracle JDBC URL,Such as jdbc:oracle:thin@host:port:sid "
+ + "or jdbc:oracle:thin@host:port/service_name")
private String jdbcUrl;
@ApiModelProperty("Username for JDBC URL")
@@ -89,4 +90,20 @@ public class OracleSinkDTO {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
}
}
+
+ /**
+ * Get Oracle table info
+ *
+ * @param oracleSink Oracle sink dto,{@link OracleSinkDTO}
+ * @param columnList Oracle column info list,{@link OracleColumnInfo}
+ * @return {@link OracleTableInfo}
+ */
+ public static OracleTableInfo getTableInfo(OracleSinkDTO oracleSink, List<OracleColumnInfo> columnList) {
+ OracleTableInfo tableInfo = new OracleTableInfo();
+ tableInfo.setTableName(oracleSink.getTableName());
+ tableInfo.setPrimaryKey(oracleSink.getPrimaryKey());
+ tableInfo.setUserName(oracleSink.getUsername());
+ tableInfo.setColumns(columnList);
+ return tableInfo;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleTableInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleTableInfo.java
new file mode 100644
index 000000000..c6f2bd120
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/oracle/OracleTableInfo.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.sink.oracle;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * Oracle table info.
+ */
+@Data
+public class OracleTableInfo {
+
+ private String tableName;
+
+ private String comment;
+
+ private String primaryKey;
+
+ private String userName;
+
+ private List<OracleColumnInfo> columns;
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleJdbcUtils.java
new file mode 100644
index 000000000..0127ad479
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleJdbcUtils.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.oracle;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Utils for Oracle JDBC.
+ */
+public class OracleJdbcUtils {
+
+ private static final String ORACLE_JDBC_PREFIX = "jdbc:oracle";
+
+ private static final String ORACLE_DRIVER_CLASS = "oracle.jdbc.driver.OracleDriver";
+
+ private static final Logger LOG = LoggerFactory.getLogger(OracleJdbcUtils.class);
+
+ /**
+ * Get Oracle connection from the url and user.
+ *
+ * @param url jdbc url,such as jdbc:oracle:thin@host:port:sid or jdbc:oracle:thin@host:port/service_name
+ * @param user Username for JDBC URL
+ * @param password User password
+ * @return {@link Connection}
+ * @throws Exception on get connection error
+ */
+ public static Connection getConnection(String url, String user, String password)
+ throws Exception {
+ if (StringUtils.isBlank(url) || !url.startsWith(ORACLE_JDBC_PREFIX)) {
+ throw new Exception("Oracle server URL was invalid, it should start with jdbc:oracle");
+ }
+ Connection conn;
+ try {
+ Class.forName(ORACLE_DRIVER_CLASS);
+ conn = DriverManager.getConnection(url, user, password);
+ } catch (Exception e) {
+ String errorMsg = "get Oracle connection error, please check Oracle JDBC url, username or password!";
+ LOG.error(errorMsg, e);
+ throw new Exception(errorMsg + " other error msg: " + e.getMessage());
+ }
+ if (conn == null) {
+ throw new Exception("get Oracle connection failed, please contact administrator.");
+ }
+ LOG.info("get Oracle connection success, url={}", url);
+ return conn;
+ }
+
+ /**
+ * Execute SQL command on Oracle.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param sql SQL string to be executed
+ * @throws Exception on execute SQL error
+ */
+ public static void executeSql(Connection conn, String sql) throws Exception {
+ Statement stmt = conn.createStatement();
+ LOG.info("execute sql [{}] success !", sql);
+ stmt.execute(sql);
+ stmt.close();
+ }
+
+ /**
+ * Execute query SQL on Oracle.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param sql SQL string to be executed
+ * @return {@link ResultSet}
+ * @throws Exception on execute query SQL error
+ */
+ public static ResultSet executeQuerySql(Connection conn, String sql)
+ throws Exception {
+ Statement stmt = conn.createStatement();
+ ResultSet resultSet = stmt.executeQuery(sql);
+ LOG.info("execute sql [{}] success !", sql);
+ return resultSet;
+ }
+
+ /**
+ * Execute batch query SQL on Oracle.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param sqls SQL string to be executed
+ * @throws Exception on get execute SQL batch error
+ */
+ public static void executeSqlBatch(Connection conn, List<String> sqls)
+ throws Exception {
+ Statement stmt = conn.createStatement();
+ for (String entry : sqls) {
+ stmt.execute(entry);
+ }
+ stmt.close();
+ LOG.info("execute sql [{}] success! ", sqls);
+ }
+
+ /**
+ * Create Oracle table by OracleTableInfo
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param tableInfo Oracle table info {@link OracleTableInfo}
+ * @throws Exception on create table error
+ */
+ public static void createTable(Connection conn, OracleTableInfo tableInfo)
+ throws Exception {
+ if (checkTablesExist(conn, tableInfo.getUserName(), tableInfo.getTableName())) {
+ LOG.info("The table [{}] are exists !", tableInfo.getTableName());
+ } else {
+ List<String> createTableSqls = OracleSqlBuilder.buildCreateTableSql(tableInfo);
+ executeSqlBatch(conn, createTableSqls);
+ LOG.info("execute sql [{}] success! ", createTableSqls);
+ }
+ }
+
+ /**
+ * Check tables from the Oracle information_schema.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param userName Oracle database name
+ * @param tableName Oracle table name
+ * @return true if table exist, otherwise false
+ * @throws Exception on check table exist error
+ */
+ public static boolean checkTablesExist(Connection conn, String userName, String tableName)
+ throws Exception {
+ boolean result = false;
+ String checkTableSql = OracleSqlBuilder.getCheckTable(userName, tableName);
+ ResultSet resultSet = executeQuerySql(conn, checkTableSql);
+ if (null != resultSet && resultSet.next()) {
+ int size = resultSet.getInt(1);
+ if (size > 0) {
+ LOG.info("check table exist for username={} table={}, result={}", userName, tableName, result);
+ return true;
+ }
+ }
+ resultSet.close();
+ return result;
+ }
+
+ /**
+ * Check whether the column exists in the table.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param tableName Oracle table name
+ * @param column Oracle table column name
+ * @return true if column exist in the table, otherwise false
+ * @throws Exception on check column exist error
+ */
+ public static boolean checkColumnExist(Connection conn, String tableName, String column)
+ throws Exception {
+ boolean result = false;
+ String checkColumnSql = OracleSqlBuilder.getCheckColumn(tableName, column);
+ ResultSet resultSet = executeQuerySql(conn, checkColumnSql);
+ if (resultSet != null && resultSet.next()) {
+ int count = resultSet.getInt(1);
+ if (count > 0) {
+ result = true;
+ }
+ }
+ resultSet.close();
+ LOG.info("check column exist for table={}, column={}, result={} ", tableName, column, result);
+ return result;
+ }
+
+ /**
+ * Query all columns of the tableName.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param tableName Oracle table name
+ * @return {@link List}
+ * @throws Exception on get columns error
+ */
+ public static List<OracleColumnInfo> getColumns(Connection conn, String tableName)
+ throws Exception {
+ String querySql = OracleSqlBuilder.buildDescTableSql(tableName);
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(querySql);
+ List<OracleColumnInfo> columnList = new ArrayList<>();
+ while (rs.next()) {
+ OracleColumnInfo columnInfo = new OracleColumnInfo();
+ columnInfo.setName(rs.getString(1));
+ columnInfo.setType(rs.getString(2));
+ columnInfo.setComment(rs.getString(3));
+ columnList.add(columnInfo);
+ }
+ rs.close();
+ return columnList;
+ }
+
+ /**
+ * Add columns for Oracle table.
+ *
+ * @param conn JDBC Connection {@link Connection}
+ * @param tableName Oracle table name
+ * @param columns Oracle columns to be added
+ * @throws Exception on add columns error
+ */
+ public static void addColumns(Connection conn, String tableName, List<OracleColumnInfo> columns)
+ throws Exception {
+ List<OracleColumnInfo> columnInfos = new ArrayList<>();
+
+ for (OracleColumnInfo columnInfo : columns) {
+ if (!checkColumnExist(conn, tableName, columnInfo.getName())) {
+ columnInfos.add(columnInfo);
+ }
+ }
+ List<String> addColumnSql = OracleSqlBuilder.buildAddColumnsSql(tableName, columnInfos);
+ executeSqlBatch(conn, addColumnSql);
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleResourceOperator.java
new file mode 100644
index 000000000..b4041e6e2
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleResourceOperator.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.oracle;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleSinkDTO;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleTableInfo;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.service.resource.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+
+public class OracleResourceOperator implements SinkResourceOperator {
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(OracleResourceOperator.class);
+
+ @Autowired
+ private StreamSinkService sinkService;
+
+ @Autowired
+ private StreamSinkFieldEntityMapper fieldEntityMapper;
+
+ @Override
+ public Boolean accept(SinkType sinkType) {
+ return SinkType.ORACLE == sinkType;
+ }
+
+ @Override
+ public void createSinkResource(SinkInfo sinkInfo) {
+ LOG.info("begin to create Oracle resources sinkId={}", sinkInfo.getId());
+ if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+ LOG.warn("Oracle resource [" + sinkInfo.getId() + "] already success, skip to create");
+ return;
+ } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) {
+ LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]");
+ return;
+ }
+ this.createTable(sinkInfo);
+ }
+
+ /**
+ * Create Oracle table by SinkInfo.
+ *
+ * @param sinkInfo {@link SinkInfo}
+ */
+ private void createTable(SinkInfo sinkInfo) {
+ LOG.info("begin to create Oracle table for sinkId={}", sinkInfo.getId());
+ List<StreamSinkFieldEntity> fieldList = fieldEntityMapper.selectBySinkId(sinkInfo.getId());
+ if (CollectionUtils.isEmpty(fieldList)) {
+ LOG.warn("no Oracle fields found, skip to create table for sinkId={}", sinkInfo.getId());
+ }
+ // set columns
+ List<OracleColumnInfo> columnList = new ArrayList<>();
+ for (StreamSinkFieldEntity field : fieldList) {
+ OracleColumnInfo columnInfo = new OracleColumnInfo();
+ columnInfo.setName(field.getFieldName());
+ columnInfo.setType(field.getFieldType());
+ columnInfo.setComment(field.getFieldComment());
+ columnList.add(columnInfo);
+ }
+
+ OracleSinkDTO oracleSink = OracleSinkDTO.getFromJson(sinkInfo.getExtParams());
+ OracleTableInfo tableInfo = OracleSinkDTO.getTableInfo(oracleSink, columnList);
+ String url = oracleSink.getJdbcUrl();
+ String user = oracleSink.getUsername();
+ String password = oracleSink.getPassword();
+ String tableName = tableInfo.getTableName();
+ Connection conn = null;
+ try {
+ conn = OracleJdbcUtils.getConnection(url, user, password);
+
+ // In Oracle, there is no need to consider whether the database exists
+ // 1.If table not exists, create it
+ OracleJdbcUtils.createTable(conn, tableInfo);
+ // 2. table exists, add columns - skip the exists columns
+ OracleJdbcUtils.addColumns(conn, tableName, columnList);
+ // 3. update the sink status to success
+ String info = "success to create Oracle resource";
+ sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+ LOG.info(info + " for sinkInfo={}", sinkInfo);
+ // 4. close connection.
+ conn.close();
+ } catch (Throwable e) {
+ String errMsg = "create Oracle table failed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+ throw new WorkflowException(errMsg);
+ } finally {
+ try {
+ if (null != conn) {
+ conn.close();
+ conn = null;
+ }
+ } catch (Throwable e) {
+ String errMsg = "close Oracle connection failed: " + e.getMessage();
+ throw new WorkflowException(errMsg);
+ }
+ }
+ LOG.info("success create Oracle table for data sink [" + sinkInfo.getId() + "]");
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleSqlBuilder.java
new file mode 100644
index 000000000..dcac4b6bb
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/oracle/OracleSqlBuilder.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.oracle;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleColumnInfo;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class OracleSqlBuilder {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OracleSqlBuilder.class);
+
+ /**
+ * Build SQL to check whether the table exists.
+ *
+ * @param userName Oracle database name
+ * @param tableName Oracle table name
+ * @return the check table SQL string
+ */
+ public static String getCheckTable(String userName, String tableName) {
+ StringBuilder sqlBuilder = new StringBuilder();
+ sqlBuilder.append("SELECT COUNT(*) FROM ALL_TABLES WHERE OWNER = UPPER('")
+ .append(userName)
+ .append("') ")
+ .append("AND TABLE_NAME = '")
+ .append(tableName)
+ .append("' ");
+ LOGGER.info("check table sql: {}", sqlBuilder);
+ return sqlBuilder.toString();
+ }
+
+ /**
+ * Build SQL to check whether the column exists.
+ *
+ * @param tableName Oracle table name
+ * @param columnName Oracle column name
+ * @return the check column SQL string
+ */
+ public static String getCheckColumn(String tableName, String columnName) {
+ StringBuilder sqlBuilder = new StringBuilder();
+ sqlBuilder.append("SELECT count(1) ")
+ .append(" from USER_TAB_COLUMNS where TABLE_NAME= '")
+ .append(tableName)
+ .append("' and COLUMN_NAME = '")
+ .append(columnName)
+ .append("' ");
+ LOGGER.info("check table sql: {}", sqlBuilder);
+ return sqlBuilder.toString();
+ }
+
+ /**
+ * Build create table SQL by OracleTableInfo.
+ *
+ * @param table Oracle table info {@link OracleTableInfo}
+ * @return the create table SQL String
+ */
+ public static List<String> buildCreateTableSql(OracleTableInfo table) {
+ List<String> sqls = new ArrayList<>();
+ StringBuilder createSql = new StringBuilder();
+ // Support _ beginning with underscore
+ createSql.append("CREATE TABLE ").append(table.getUserName())
+ .append(".\"")
+ .append(table.getTableName())
+ .append("\"");
+ // Construct columns and partition columns
+ createSql.append(buildCreateColumnsSql(table));
+ sqls.add(createSql.toString());
+ sqls.addAll(getColumnsComment(table.getTableName(), table.getColumns()));
+ LOGGER.info("create table sql: {}", sqls);
+ return sqls;
+ }
+
+ /**
+ * Build add columns SQL.
+ *
+ * @param tableName Oracle table name
+ * @param columnList Oracle column list {@link List}
+ * @return add column SQL string list
+ */
+ public static List<String> buildAddColumnsSql(String tableName, List<OracleColumnInfo> columnList) {
+ List<String> resultList = new ArrayList<>();
+
+ for (OracleColumnInfo columnInfo : columnList) {
+ StringBuilder sqlBuilder = new StringBuilder();
+ sqlBuilder.append("ALTER TABLE \"")
+ .append(tableName)
+ .append("\" ADD \"")
+ .append(columnInfo.getName())
+ .append("\" ")
+ .append(columnInfo.getType())
+ .append(" ");
+ resultList.add(sqlBuilder.toString());
+ }
+ resultList.addAll(getColumnsComment(tableName, columnList));
+ LOGGER.info("add columns sql={}", resultList);
+ return resultList;
+ }
+
+ /**
+ * Build create column SQL.
+ *
+ * @param table Oracle table info {@link OracleColumnInfo}
+ * @return create column SQL string
+ */
+ private static String buildCreateColumnsSql(OracleTableInfo table) {
+ StringBuilder sql = new StringBuilder();
+ sql.append(" (");
+ List<String> columnList = getColumnsInfo(table.getColumns());
+ sql.append(StringUtils.join(columnList, ","));
+ sql.append(") ");
+ LOGGER.info("create columns sql={}", sql);
+ return sql.toString();
+ }
+
+ /**
+ * Build column info by OracleColumnInfo list.
+ *
+ * @param columns Oracle column info {@link OracleColumnInfo} list
+ * @return the SQL list
+ */
+ private static List<String> getColumnsInfo(List<OracleColumnInfo> columns) {
+ List<String> columnList = new ArrayList<>();
+ for (OracleColumnInfo columnInfo : columns) {
+ // Construct columns and partition columns
+ StringBuilder columnBuilder = new StringBuilder();
+ columnBuilder.append("\"")
+ .append(columnInfo.getName())
+ .append("\" ")
+ .append(columnInfo.getType());
+ columnList.add(columnBuilder.toString());
+ }
+ return columnList;
+ }
+
+ private static List<String> getColumnsComment(String tableName, List<OracleColumnInfo> columns) {
+ List<String> commentList = new ArrayList<>();
+ for (OracleColumnInfo columnInfo : columns) {
+ if (StringUtils.isNoneBlank(columnInfo.getComment())) {
+ StringBuilder commSql = new StringBuilder();
+ commSql.append("COMMENT ON COLUMN \"")
+ .append(tableName)
+ .append("\".\"")
+ .append(columnInfo.getName())
+ .append("\" IS '")
+ .append(columnInfo.getComment())
+ .append("' ");
+ commentList.add(commSql.toString());
+ }
+ }
+ return commentList;
+ }
+
+ /**
+ * Build query table SQL.
+ *
+ * @param tableName Oracle table name
+ * @return desc table SQL string
+ */
+ public static String buildDescTableSql(String tableName) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("SELECT A.COLUMN_NAME,A.DATA_TYPE,B.COMMENTS ")
+ .append(" FROM USER_TAB_COLUMNS A LEFT JOIN USER_COL_COMMENTS B ")
+ .append(" ON A.TABLE_NAME=B.TABLE_NAME AND A.COLUMN_NAME=B.COLUMN_NAME ")
+ .append("WHERE A.TABLE_NAME = '")
+ .append(tableName)
+ .append("' ORDER BY A.COLUMN_ID ");
+ LOGGER.info("desc table sql={}", sql);
+ return sql.toString();
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleSinkServiceTest.java
index dfac64cd9..ea0d32f77 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleSinkServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/sink/OracleSinkServiceTest.java
@@ -21,16 +21,21 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleColumnInfo;
import org.apache.inlong.manager.common.pojo.sink.oracle.OracleSink;
import org.apache.inlong.manager.common.pojo.sink.oracle.OracleSinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.oracle.OracleTableInfo;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.impl.InlongStreamServiceTest;
+import org.apache.inlong.manager.service.resource.oracle.OracleJdbcUtils;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
@@ -61,7 +66,7 @@ public class OracleSinkServiceTest extends ServiceBaseTest {
sinkInfo.setInlongStreamId(globalStreamId);
sinkInfo.setSinkType(SinkType.SINK_ORACLE);
- sinkInfo.setJdbcUrl("jdbc:oracle://host:port/database");
+ sinkInfo.setJdbcUrl("jdbc:oracle:thin@host:port/database");
sinkInfo.setUsername("oracle");
sinkInfo.setPassword("inlong");
sinkInfo.setTableName("user");
@@ -110,4 +115,80 @@ public class OracleSinkServiceTest extends ServiceBaseTest {
deleteSink(sinkId);
}
+ /**
+ * Just using in local test.
+ */
+ @Disabled
+ public void testDbResource() {
+ String url = "jdbc:oracle:thin:@localhost:1521/ORCLCDB";
+ String username = "c###inlong_test";
+ String password = "123456";
+ String tableName = "test02";
+
+ try {
+ Connection connection = OracleJdbcUtils.getConnection(url, username, password);
+ OracleTableInfo tableInfo = bulidTestOracleTableInfo(username, tableName);
+ OracleJdbcUtils.createTable(connection, tableInfo);
+ List<OracleColumnInfo> addColumns = buildAddColumns();
+ OracleJdbcUtils.addColumns(connection, tableName, addColumns);
+ List<OracleColumnInfo> columns = OracleJdbcUtils.getColumns(connection, tableName);
+ Assertions.assertEquals(columns.size(), tableInfo.getColumns().size() + addColumns.size());
+ connection.close();
+ } catch (Exception e) {
+ // print to local console
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Build add Oracle column info.
+ *
+ * @return {@link List}
+ */
+ private List<OracleColumnInfo> buildAddColumns() {
+ List<OracleColumnInfo> addColum = new ArrayList<>();
+ OracleColumnInfo test1 = new OracleColumnInfo();
+ addColum.add(test1);
+ test1.setName("test1");
+ test1.setType("NUMBER(16)");
+ test1.setComment("test1");
+ OracleColumnInfo test2 = new OracleColumnInfo();
+ addColum.add(test2);
+ test2.setName("test2");
+ test2.setType("VARCHAR2(10)");
+ test2.setComment("test2");
+ return addColum;
+ }
+
+ /**
+ * Build test Oracle table info.
+ *
+ * @param userName Oracle database name
+ * @param tableName Oracle table name
+ * @return {@link OracleTableInfo}
+ */
+ private OracleTableInfo bulidTestOracleTableInfo(String userName, String tableName) {
+ OracleTableInfo oracleTableInfo = new OracleTableInfo();
+ oracleTableInfo.setTableName(tableName);
+ List<OracleColumnInfo> columnInfos = new ArrayList<>();
+ OracleColumnInfo id = new OracleColumnInfo();
+ columnInfos.add(id);
+ id.setName("id");
+ id.setType("NUMBER(6)");
+ id.setComment("id");
+ OracleColumnInfo cell = new OracleColumnInfo();
+ columnInfos.add(cell);
+ cell.setName("cell");
+ cell.setType("VARCHAR2(10)");
+ cell.setComment("cell");
+ OracleColumnInfo name = new OracleColumnInfo();
+ columnInfos.add(name);
+ name.setName("name");
+ name.setType("VARCHAR2(20)");
+ name.setComment("name");
+ oracleTableInfo.setColumns(columnInfos);
+ oracleTableInfo.setUserName(userName);
+ return oracleTableInfo;
+ }
+
}