You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/23 04:11:59 UTC
[doris] 10/11: [Fix](Oracle External Table) fix that oracle external table can not insert batch values (#16117)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit c32a372b3cb8ddfb0481133b51f7ef19ceab4d28
Author: Tiewei Fang <43...@users.noreply.github.com>
AuthorDate: Sat Jan 21 07:57:12 2023 +0800
[Fix](Oracle External Table) fix that oracle external table can not insert batch values (#16117)
Issue Number: close #xxx
This pr fix two bugs:
_jdbc_scanner may be nullptr in vjdbc_connector.cpp, so we use another method to count jdbc statistic. close [Enhencement](jdbc scanner) add profile for jdbc scanner #15914
In the batch insertion scenario, oracle database does not support syntax insert into tables values (...),(...); , what it supports is:
insert all
into table(col1,col2) values(c1v1, c2v1)
into table(col1,col2) values(c1v2, c2v2)
SELECT 1 FROM DUAL;
---
be/src/exec/table_connector.cpp | 41 +++++++++++++++++++++-
be/src/exec/table_connector.h | 9 +++++
be/src/vec/exec/scan/new_jdbc_scanner.cpp | 15 +++++++-
be/src/vec/exec/scan/new_jdbc_scanner.h | 2 ++
be/src/vec/exec/vjdbc_connector.cpp | 20 ++++-------
be/src/vec/exec/vjdbc_connector.h | 16 +++++++--
.../java/org/apache/doris/udf/JdbcExecutor.java | 1 -
7 files changed, 85 insertions(+), 19 deletions(-)
diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp
index 0e07edd48d..f2c3ff8101 100644
--- a/be/src/exec/table_connector.cpp
+++ b/be/src/exec/table_connector.cpp
@@ -166,7 +166,14 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block*
TOdbcTableType::type table_type) {
_insert_stmt_buffer.clear();
std::u16string insert_stmt;
- {
+ if (table_type == TOdbcTableType::ORACLE) {
+ SCOPED_TIMER(_convert_tuple_timer);
+ oracle_type_append(table_name, block, output_vexpr_ctxs, start_send_row, num_rows_sent,
+ table_type);
+ // Translate utf8 string to utf16 to use unicode encoding
+ insert_stmt = utf8_to_u16string(_insert_stmt_buffer.data(),
+ _insert_stmt_buffer.data() + _insert_stmt_buffer.size());
+ } else {
SCOPED_TIMER(_convert_tuple_timer);
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name);
@@ -203,6 +210,38 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block*
return Status::OK();
}
+Status TableConnector::oracle_type_append(
+ const std::string& table_name, vectorized::Block* block,
+ const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs, uint32_t start_send_row,
+ uint32_t* num_rows_sent, TOdbcTableType::type table_type) {
+ fmt::format_to(_insert_stmt_buffer, "INSERT ALL ");
+ int num_rows = block->rows();
+ int num_columns = block->columns();
+ for (int i = start_send_row; i < num_rows; ++i) {
+ (*num_rows_sent)++;
+ fmt::format_to(_insert_stmt_buffer, "INTO {} VALUES (", table_name);
+ // Construct insert statement of odbc/jdbc table
+ for (int j = 0; j < num_columns; ++j) {
+ if (j != 0) {
+ fmt::format_to(_insert_stmt_buffer, "{}", ", ");
+ }
+ auto& column_ptr = block->get_by_position(j).column;
+ auto& type_ptr = block->get_by_position(j).type;
+ RETURN_IF_ERROR(convert_column_data(
+ column_ptr, type_ptr, output_vexpr_ctxs[j]->root()->type(), i, table_type));
+ }
+
+ if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) {
+ fmt::format_to(_insert_stmt_buffer, "{}", ") ");
+ } else {
+ // batch exhausted or _insert_stmt_buffer is full, need to do real insert stmt
+ fmt::format_to(_insert_stmt_buffer, "{}", ") SELECT 1 FROM DUAL");
+ break;
+ }
+ }
+ return Status::OK();
+}
+
Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_ptr,
const vectorized::DataTypePtr& type_ptr,
const TypeDescriptor& type, int row,
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index f051776b49..3fe0ec5721 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -87,6 +87,15 @@ protected:
RuntimeProfile::Counter* _result_send_timer = nullptr;
// number of sent rows
RuntimeProfile::Counter* _sent_rows_counter = nullptr;
+
+private:
+ // Because Oracle database do not support
+ // insert into tables values (...),(...);
+ // Here we do something special for Oracle.
+ Status oracle_type_append(const std::string& table_name, vectorized::Block* block,
+ const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs,
+ uint32_t start_send_row, uint32_t* num_rows_sent,
+ TOdbcTableType::type table_type);
};
} // namespace doris
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index 80fc3669c8..0091efc7cd 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -18,6 +18,7 @@
#include "new_jdbc_scanner.h"
#include "util/runtime_profile.h"
+#include "vec/exec/vjdbc_connector.h"
namespace doris::vectorized {
NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int64_t limit,
@@ -76,7 +77,7 @@ Status NewJdbcScanner::prepare(RuntimeState* state, VExprContext** vconjunct_ctx
_jdbc_param.query_string = std::move(_query_string);
_jdbc_param.table_type = _table_type;
- _jdbc_connector.reset(new (std::nothrow) JdbcConnector(this, _jdbc_param));
+ _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param));
if (_jdbc_connector == nullptr) {
return Status::InternalError("new a jdbc scanner failed.");
}
@@ -113,6 +114,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
if (_jdbc_eos == true) {
*eof = true;
+ _update_profile();
return Status::OK();
}
@@ -138,6 +140,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
if (_jdbc_eos == true) {
if (block->rows() == 0) {
+ _update_profile();
*eof = true;
}
break;
@@ -160,6 +163,16 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
return Status::OK();
}
+void NewJdbcScanner::_update_profile() {
+ JdbcConnector::JdbcStatistic& jdbc_statistic = _jdbc_connector->get_jdbc_statistic();
+ COUNTER_UPDATE(_load_jar_timer, jdbc_statistic._load_jar_timer);
+ COUNTER_UPDATE(_init_connector_timer, jdbc_statistic._init_connector_timer);
+ COUNTER_UPDATE(_check_type_timer, jdbc_statistic._check_type_timer);
+ COUNTER_UPDATE(_get_data_timer, jdbc_statistic._get_data_timer);
+ COUNTER_UPDATE(_execte_read_timer, jdbc_statistic._execte_read_timer);
+ COUNTER_UPDATE(_connector_close_timer, jdbc_statistic._connector_close_timer);
+}
+
Status NewJdbcScanner::close(RuntimeState* state) {
RETURN_IF_ERROR(VScanner::close(state));
RETURN_IF_ERROR(_jdbc_connector->close());
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.h b/be/src/vec/exec/scan/new_jdbc_scanner.h
index 4f869d0f41..da5a3e7faf 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.h
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.h
@@ -48,6 +48,8 @@ protected:
RuntimeProfile::Counter* _connector_close_timer = nullptr;
private:
+ void _update_profile();
+
bool _is_init;
bool _jdbc_eos;
diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp
index 5752e7ab19..519ccbbc77 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -53,12 +53,6 @@ JdbcConnector::JdbcConnector(const JdbcConnectorParam& param)
_conn_param(param),
_closed(false) {}
-JdbcConnector::JdbcConnector(NewJdbcScanner* jdbc_scanner, const JdbcConnectorParam& param)
- : TableConnector(param.tuple_desc, param.query_string),
- _jdbc_scanner(jdbc_scanner),
- _conn_param(param),
- _closed(false) {}
-
JdbcConnector::~JdbcConnector() {
if (!_closed) {
close();
@@ -71,7 +65,7 @@ JdbcConnector::~JdbcConnector() {
#define DELETE_BASIC_JAVA_CLAZZ_REF(CPP_TYPE) env->DeleteGlobalRef(_executor_##CPP_TYPE##_clazz);
Status JdbcConnector::close() {
- SCOPED_TIMER(_jdbc_scanner->_connector_close_timer);
+ SCOPED_RAW_TIMER(&_jdbc_statistic._connector_close_timer);
_closed = true;
if (!_is_open) {
return Status::OK();
@@ -132,12 +126,12 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
if (_conn_param.resource_name.empty()) {
// for jdbcExternalTable, _conn_param.resource_name == ""
// so, we use _conn_param.driver_path as key of jarpath
- SCOPED_TIMER(_jdbc_scanner->_load_jar_timer);
+ SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
RETURN_IF_ERROR(function_cache->get_jarpath(
std::abs((int64_t)hash_str(_conn_param.driver_path)), _conn_param.driver_path,
_conn_param.driver_checksum, &local_location));
} else {
- SCOPED_TIMER(_jdbc_scanner->_load_jar_timer);
+ SCOPED_RAW_TIMER(&_jdbc_statistic._load_jar_timer);
RETURN_IF_ERROR(function_cache->get_jarpath(
std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path,
_conn_param.driver_checksum, &local_location));
@@ -158,7 +152,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
RETURN_IF_ERROR(jni_frame.push(env));
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
{
- SCOPED_TIMER(_jdbc_scanner->_init_connector_timer);
+ SCOPED_RAW_TIMER(&_jdbc_statistic._init_connector_timer);
_executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes);
}
jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
@@ -186,7 +180,7 @@ Status JdbcConnector::query() {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
{
- SCOPED_TIMER(_jdbc_scanner->_execte_read_timer);
+ SCOPED_RAW_TIMER(&_jdbc_statistic._execte_read_timer);
jint colunm_count =
env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz, _executor_read_id);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
@@ -201,7 +195,7 @@ Status JdbcConnector::query() {
}
Status JdbcConnector::_check_column_type() {
- SCOPED_TIMER(_jdbc_scanner->_check_type_timer);
+ SCOPED_RAW_TIMER(&_jdbc_statistic._check_type_timer);
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
jobject type_lists =
@@ -350,7 +344,7 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
if (!_is_open) {
return Status::InternalError("get_next before open of jdbc connector.");
}
- SCOPED_TIMER(_jdbc_scanner->_get_data_timer);
+ SCOPED_RAW_TIMER(&_jdbc_statistic._get_data_timer);
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
jboolean has_next =
diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h
index ee99be8ec5..c1d416783c 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -47,9 +47,16 @@ struct JdbcConnectorParam {
class JdbcConnector : public TableConnector {
public:
- JdbcConnector(const JdbcConnectorParam& param);
+ struct JdbcStatistic {
+ int64_t _load_jar_timer = 0;
+ int64_t _init_connector_timer = 0;
+ int64_t _get_data_timer = 0;
+ int64_t _check_type_timer = 0;
+ int64_t _execte_read_timer = 0;
+ int64_t _connector_close_timer = 0;
+ };
- JdbcConnector(NewJdbcScanner* jdbc_scanner, const JdbcConnectorParam& param);
+ JdbcConnector(const JdbcConnectorParam& param);
~JdbcConnector() override;
@@ -68,6 +75,8 @@ public:
Status abort_trans() override; // should be call after transaction abort
Status finish_trans() override; // should be call after transaction commit
+ JdbcStatistic& get_jdbc_statistic() { return _jdbc_statistic; }
+
Status close() override;
private:
@@ -89,7 +98,6 @@ private:
Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index,
int rows);
- NewJdbcScanner* _jdbc_scanner;
const JdbcConnectorParam& _conn_param;
//java.sql.Types: https://docs.oracle.com/javase/7/docs/api/constant-values.html#java.sql.Types.INTEGER
std::map<int, PrimitiveType> _arr_jdbc_map {
@@ -126,6 +134,8 @@ private:
std::vector<MutableColumnPtr>
str_array_cols; // for array type to save data like big string [1,2,3]
+ JdbcStatistic _jdbc_statistic;
+
#define FUNC_VARI_DECLARE(RETURN_TYPE) \
RETURN_TYPE _jobject_to_##RETURN_TYPE(JNIEnv* env, jobject jobj); \
jclass _executor_##RETURN_TYPE##_clazz;
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index 0c9894ad18..d90aa4055a 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -272,4 +272,3 @@ public class JdbcExecutor {
}
}
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org