You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/23 04:11:59 UTC

[doris] 10/11: [Fix](Oracle External Table) fix that oracle external table can not insert batch values (#16117)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c32a372b3cb8ddfb0481133b51f7ef19ceab4d28
Author: Tiewei Fang <43...@users.noreply.github.com>
AuthorDate: Sat Jan 21 07:57:12 2023 +0800

    [Fix](Oracle External Table) fix that oracle external table can not insert batch values (#16117)
    
    Issue Number: close #xxx
    
    This pr fix two bugs:
    
    _jdbc_scanner may be nullptr in vjdbc_connector.cpp, so we use another method to count jdbc statistic. close [Enhencement](jdbc scanner) add profile for jdbc scanner #15914
    In the batch insertion scenario, oracle database does not support syntax insert into tables values (...),(...); , what it supports is:
    insert all
    into table(col1,col2) values(c1v1, c2v1)
    into table(col1,col2) values(c1v2, c2v2)
    SELECT 1 FROM DUAL;
---
 be/src/exec/table_connector.cpp                    | 41 +++++++++++++++++++++-
 be/src/exec/table_connector.h                      |  9 +++++
 be/src/vec/exec/scan/new_jdbc_scanner.cpp          | 15 +++++++-
 be/src/vec/exec/scan/new_jdbc_scanner.h            |  2 ++
 be/src/vec/exec/vjdbc_connector.cpp                | 20 ++++-------
 be/src/vec/exec/vjdbc_connector.h                  | 16 +++++++--
 .../java/org/apache/doris/udf/JdbcExecutor.java    |  1 -
 7 files changed, 85 insertions(+), 19 deletions(-)

diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp
index 0e07edd48d..f2c3ff8101 100644
--- a/be/src/exec/table_connector.cpp
+++ b/be/src/exec/table_connector.cpp
@@ -166,7 +166,14 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block*
                               TOdbcTableType::type table_type) {
     _insert_stmt_buffer.clear();
     std::u16string insert_stmt;
-    {
+    if (table_type == TOdbcTableType::ORACLE) {
+        SCOPED_TIMER(_convert_tuple_timer);
+        oracle_type_append(table_name, block, output_vexpr_ctxs, start_send_row, num_rows_sent,
+                           table_type);
+        // Translate utf8 string to utf16 to use unicode encoding
+        insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(),
+                                        _insert_stmt_buffer.data() + _insert_stmt_buffer.size());
+    } else {
         SCOPED_TIMER(_convert_tuple_timer);
         fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name);
 
@@ -203,6 +210,38 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block*
     return Status::OK();
 }
 
+Status TableConnector::oracle_type_append(
+        const std::string& table_name, vectorized::Block* block,
+        const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs, uint32_t start_send_row,
+        uint32_t* num_rows_sent, TOdbcTableType::type table_type) {
+    fmt::format_to(_insert_stmt_buffer, "INSERT ALL ");
+    int num_rows = block->rows();
+    int num_columns = block->columns();
+    for (int i = start_send_row; i < num_rows; ++i) {
+        (*num_rows_sent)++;
+        fmt::format_to(_insert_stmt_buffer, "INTO {} VALUES (", table_name);
+        // Construct insert statement of odbc/jdbc table
+        for (int j = 0; j < num_columns; ++j) {
+            if (j != 0) {
+                fmt::format_to(_insert_stmt_buffer, "{}", ", ");
+            }
+            auto& column_ptr = block->get_by_position(j).column;
+            auto& type_ptr = block->get_by_position(j).type;
+            RETURN_IF_ERROR(convert_column_data(
+                    column_ptr, type_ptr, output_vexpr_ctxs[j]->root()->type(), i, table_type));
+        }
+
+        if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) {
+            fmt::format_to(_insert_stmt_buffer, "{}", ") ");
+        } else {
+            // batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt
+            fmt::format_to(_insert_stmt_buffer, "{}", ") SELECT 1 FROM DUAL");
+            break;
+        }
+    }
+    return Status::OK();
+}
+
 Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_ptr,
                                            const vectorized::DataTypePtr& type_ptr,
                                            const TypeDescriptor& type, int row,
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index f051776b49..3fe0ec5721 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -87,6 +87,15 @@ protected:
     RuntimeProfile::Counter* _result_send_timer = nullptr;
     // number of sent rows
     RuntimeProfile::Counter* _sent_rows_counter = nullptr;
+
+private:
+    // Because Oracle database do not support
+    // insert into tables values (...),(...);
+    // Here we do something special for Oracle.
+    Status oracle_type_append(const std::string& table_name, vectorized::Block* block,
+                              const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs,
+                              uint32_t start_send_row, uint32_t* num_rows_sent,
+                              TOdbcTableType::type table_type);
 };
 
 } // namespace doris
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index 80fc3669c8..0091efc7cd 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -18,6 +18,7 @@
 #include "new_jdbc_scanner.h"
 
 #include "util/runtime_profile.h"
+#include "vec/exec/vjdbc_connector.h"
 
 namespace doris::vectorized {
 NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
@@ -76,7 +77,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx
     _jdbc_param.query_string = std::move(_query_string);
     _jdbc_param.table_type = _table_type;
 
-    _jdbc_connector.reset(new (std::nothrow) JdbcConnector(this, _jdbc_param));
+    _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param));
     if (_jdbc_connector == nullptr) {
         return Status::InternalError("new a jdbc scanner failed.");
     }
@@ -113,6 +114,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
 
     if (_jdbc_eos == true) {
         *eof = true;
+        _update_profile();
         return Status::OK();
     }
 
@@ -138,6 +140,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
 
         if (_jdbc_eos == true) {
             if (block->rows() == 0) {
+                _update_profile();
                 *eof = true;
             }
             break;
@@ -160,6 +163,16 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
     return Status::OK();
 }
 
+void NewJdbcScanner::_update_profile() {
+    JdbcConnector::JdbcStatistic& jdbc_statistic = _jdbc_connector->get_jdbc_statistic();
+    COUNTER_UPDATE(_load_jar_timer, jdbc_statistic._load_jar_timer);
+    COUNTER_UPDATE(_init_connector_timer, jdbc_statistic._init_connector_timer);
+    COUNTER_UPDATE(_check_type_timer, jdbc_statistic._check_type_timer);
+    COUNTER_UPDATE(_get_data_timer, jdbc_statistic._get_data_timer);
+    COUNTER_UPDATE(_execte_read_timer, jdbc_statistic._execte_read_timer);
+    COUNTER_UPDATE(_connector_close_timer, jdbc_statistic._connector_close_timer);
+}
+
 Status NewJdbcScanner::close(RuntimeState* state) {
     RETURN_IF_ERROR(VScanner::close(state));
     RETURN_IF_ERROR(_jdbc_connector->close());
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h
index 4f869d0f41..da5a3e7faf 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.h
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.h
@@ -48,6 +48,8 @@ protected:
     RuntimeProfile::Counter* _connector_close_timer = nullptr;
 
 private:
+    void _update_profile();
+
     bool _is_init;
 
     bool _jdbc_eos;
diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp
index 5752e7ab19..519ccbbc77 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -53,12 +53,6 @@ JdbcConnector::JdbcConnector(const JdbcConnectorParam& param)
           _conn_param(param),
           _closed(false) {}
 
-JdbcConnector::JdbcConnector(NewJdbcScanner* jdbc_scanner, const JdbcConnectorParam& param)
-        : TableConnector(param.tuple_desc, param.query_string),
-          _jdbc_scanner(jdbc_scanner),
-          _conn_param(param),
-          _closed(false) {}
-
 JdbcConnector::~JdbcConnector() {
     if (!_closed) {
         close();
@@ -71,7 +65,7 @@ JdbcConnector::~JdbcConnector() {
 #define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz);
 
 Status JdbcConnector::close() {
-    SCOPED_TIMER(_jdbc_scanner->_connector_close_timer);
+    SCOPED_RAW_TIMER(&_jdbc_statistic._connector_close_timer);
     _closed = true;
     if (!_is_open) {
         return Status::OK();
@@ -132,12 +126,12 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
         if (_conn_param.resource_name.empty()) {
             // for jdbcExternalTable, _conn_param.resource_name == ""
             // so, we use _conn_param.driver_path as key of jarpath
-            SCOPED_TIMER(_jdbc_scanner->_load_jar_timer);
+            SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
             RETURN_IF_ERROR(function_cache->get_jarpath(
                     std::abs((int64_t)hash_str(_conn_param.driver_path)), _conn_param.driver_path,
                     _conn_param.driver_checksum, &local_location));
         } else {
-            SCOPED_TIMER(_jdbc_scanner->_load_jar_timer);
+            SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
             RETURN_IF_ERROR(function_cache->get_jarpath(
                     std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path,
                     _conn_param.driver_checksum, &local_location));
@@ -158,7 +152,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
         RETURN_IF_ERROR(jni_frame.push(env));
         RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
         {
-            SCOPED_TIMER(_jdbc_scanner->_init_connector_timer);
+            SCOPED_RAW_TIMER(&_jdbc_statistic._init_connector_timer);
             _executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes);
         }
         jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
@@ -186,7 +180,7 @@ Status JdbcConnector::query() {
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     {
-        SCOPED_TIMER(_jdbc_scanner->_execte_read_timer);
+        SCOPED_RAW_TIMER(&_jdbc_statistic._execte_read_timer);
         jint colunm_count =
                 env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_read_id);
         RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
@@ -201,7 +195,7 @@ Status JdbcConnector::query() {
 }
 
 Status JdbcConnector::_check_column_type() {
-    SCOPED_TIMER(_jdbc_scanner->_check_type_timer);
+    SCOPED_RAW_TIMER(&_jdbc_statistic._check_type_timer);
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     jobject type_lists =
@@ -350,7 +344,7 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
     if (!_is_open) {
         return Status::InternalError("get_next before open of jdbc connector.");
     }
-    SCOPED_TIMER(_jdbc_scanner->_get_data_timer);
+    SCOPED_RAW_TIMER(&_jdbc_statistic._get_data_timer);
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     jboolean has_next =
diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h
index ee99be8ec5..c1d416783c 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -47,9 +47,16 @@ struct JdbcConnectorParam {
 
 class JdbcConnector : public TableConnector {
 public:
-    JdbcConnector(const JdbcConnectorParam& param);
+    struct JdbcStatistic {
+        int64_t _load_jar_timer = 0;
+        int64_t _init_connector_timer = 0;
+        int64_t _get_data_timer = 0;
+        int64_t _check_type_timer = 0;
+        int64_t _execte_read_timer = 0;
+        int64_t _connector_close_timer = 0;
+    };
 
-    JdbcConnector(NewJdbcScanner* jdbc_scanner, const JdbcConnectorParam& param);
+    JdbcConnector(const JdbcConnectorParam& param);
 
     ~JdbcConnector() override;
 
@@ -68,6 +75,8 @@ public:
     Status abort_trans() override; // should be call after transaction abort
     Status finish_trans() override; // should be call after transaction commit
 
+    JdbcStatistic& get_jdbc_statistic() { return _jdbc_statistic; }
+
     Status close() override;
 
 private:
@@ -89,7 +98,6 @@ private:
     Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index,
                                  int rows);
 
-    NewJdbcScanner* _jdbc_scanner;
     const JdbcConnectorParam& _conn_param;
     //java.sql.Types: https://docs.oracle.com/javase/7/docs/api/constant-values.html#java.sql.Types.INTEGER
     std::map<int, PrimitiveType> _arr_jdbc_map {
@@ -126,6 +134,8 @@ private:
     std::vector<MutableColumnPtr>
             str_array_cols; // for array type to save data like big string [1,2,3]
 
+    JdbcStatistic _jdbc_statistic;
+
 #define FUNC_VARI_DECLARE(RETURN_TYPE)                                \
     RETURN_TYPE _jobject_to_##RETURN_TYPE(JNIEnv* env, jobject jobj); \
     jclass _executor_##RETURN_TYPE##_clazz;
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index 0c9894ad18..d90aa4055a 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -272,4 +272,3 @@ public class JdbcExecutor {
         }
     }
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org