You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/08 00:52:05 UTC
[doris] branch branch-1.2-lts updated: [vectorized](jdbc) support array type in jdbc external table (#15303)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 68fbf1709b [vectorized](jdbc) support array type in jdbc external table (#15303)
68fbf1709b is described below
commit 68fbf1709bb445afa3445ec2cae0929e07615920
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Fri Dec 30 00:29:08 2022 +0800
[vectorized](jdbc) support array type in jdbc external table (#15303)
---
be/src/exec/table_connector.cpp | 291 ++++++++++++---------
be/src/exec/table_connector.h | 7 +-
be/src/vec/core/column_with_type_and_name.cpp | 2 +-
be/src/vec/exec/scan/new_jdbc_scanner.cpp | 2 +-
be/src/vec/exec/vjdbc_connector.cpp | 179 +++++++++++--
be/src/vec/exec/vjdbc_connector.h | 32 ++-
be/src/vec/sink/vjdbc_table_sink.cpp | 4 +-
be/src/vec/sink/vjdbc_table_sink.h | 2 -
.../java/org/apache/doris/analysis/ColumnDef.java | 2 +-
.../org/apache/doris/analysis/CreateTableStmt.java | 2 +-
.../java/org/apache/doris/udf/JdbcExecutor.java | 16 ++
11 files changed, 385 insertions(+), 154 deletions(-)
diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp
index fa725832d3..0e07edd48d 100644
--- a/be/src/exec/table_connector.cpp
+++ b/be/src/exec/table_connector.cpp
@@ -17,12 +17,18 @@
#include "exec/table_connector.h"
+#include <gen_cpp/Types_types.h>
+
#include <codecvt>
#include "exprs/expr.h"
+#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "util/mysql_global.h"
+#include "vec/columns/column_array.h"
#include "vec/core/block.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
@@ -157,26 +163,13 @@ Status TableConnector::append(const std::string& table_name, RowBatch* batch,
Status TableConnector::append(const std::string& table_name, vectorized::Block* block,
const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs,
uint32_t start_send_row, uint32_t* num_rows_sent,
- bool need_extra_convert) {
+ TOdbcTableType::type table_type) {
_insert_stmt_buffer.clear();
std::u16string insert_stmt;
{
SCOPED_TIMER(_convert_tuple_timer);
fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name);
- auto extra_convert_func = [&](const std::string_view& str, const bool& is_date) -> void {
- if (!need_extra_convert) {
- fmt::format_to(_insert_stmt_buffer, "'{}'", str);
- } else {
- if (is_date) {
- fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd')", str);
- } else {
- fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd hh24:mi:ss')",
- str);
- }
- }
- };
-
int num_rows = block->rows();
int num_columns = block->columns();
for (int i = start_send_row; i < num_rows; ++i) {
@@ -189,117 +182,8 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block*
}
auto& column_ptr = block->get_by_position(j).column;
auto& type_ptr = block->get_by_position(j).type;
- vectorized::ColumnPtr column;
- if (type_ptr->is_nullable()) {
- column = assert_cast<const vectorized::ColumnNullable&>(*column_ptr)
- .get_nested_column_ptr();
- if (column_ptr->is_null_at(i)) {
- fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
- continue;
- }
- } else {
- column = column_ptr;
- }
- auto [item, size] = column->get_data_at(i);
- switch (output_vexpr_ctxs[j]->root()->type().type) {
- case TYPE_BOOLEAN:
- case TYPE_TINYINT:
- fmt::format_to(_insert_stmt_buffer, "{}",
- *reinterpret_cast<const int8_t*>(item));
- break;
- case TYPE_SMALLINT:
- fmt::format_to(_insert_stmt_buffer, "{}",
- *reinterpret_cast<const int16_t*>(item));
- break;
- case TYPE_INT:
- fmt::format_to(_insert_stmt_buffer, "{}",
- *reinterpret_cast<const int32_t*>(item));
- break;
- case TYPE_BIGINT:
- fmt::format_to(_insert_stmt_buffer, "{}",
- *reinterpret_cast<const int64_t*>(item));
- break;
- case TYPE_FLOAT:
- fmt::format_to(_insert_stmt_buffer, "{}",
- *reinterpret_cast<const float*>(item));
- break;
- case TYPE_DOUBLE:
- fmt::format_to(_insert_stmt_buffer, "{}",
- *reinterpret_cast<const double*>(item));
- break;
- case TYPE_DATE: {
- vectorized::VecDateTimeValue value =
- binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(
- *(int64_t*)item);
-
- char buf[64];
- char* pos = value.to_string(buf);
- std::string_view str(buf, pos - buf - 1);
- extra_convert_func(str, true);
- break;
- }
- case TYPE_DATETIME: {
- vectorized::VecDateTimeValue value =
- binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(
- *(int64_t*)item);
-
- char buf[64];
- char* pos = value.to_string(buf);
- std::string_view str(buf, pos - buf - 1);
- extra_convert_func(str, false);
- break;
- }
- case TYPE_DATEV2: {
- vectorized::DateV2Value<vectorized::DateV2ValueType> value = binary_cast<
- uint32_t, doris::vectorized::DateV2Value<vectorized::DateV2ValueType>>(
- *(int32_t*)item);
-
- char buf[64];
- char* pos = value.to_string(buf);
- std::string str(buf, pos - buf - 1);
- extra_convert_func(str, true);
- break;
- }
- case TYPE_DATETIMEV2: {
- vectorized::DateV2Value<vectorized::DateTimeV2ValueType> value = binary_cast<
- uint64_t,
- doris::vectorized::DateV2Value<vectorized::DateTimeV2ValueType>>(
- *(int64_t*)item);
-
- char buf[64];
- char* pos = value.to_string(buf, output_vexpr_ctxs[i]->root()->type().scale);
- std::string str(buf, pos - buf - 1);
- extra_convert_func(str, false);
- break;
- }
- case TYPE_VARCHAR:
- case TYPE_CHAR:
- case TYPE_STRING: {
- fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size));
- break;
- }
- case TYPE_DECIMALV2: {
- DecimalV2Value value = *(DecimalV2Value*)(item);
- fmt::format_to(_insert_stmt_buffer, "{}", value.to_string());
- break;
- }
- case TYPE_DECIMAL32:
- case TYPE_DECIMAL64:
- case TYPE_DECIMAL128I: {
- auto val = type_ptr->to_string(*column, i);
- fmt::format_to(_insert_stmt_buffer, "{}", val);
- break;
- }
- case TYPE_LARGEINT: {
- fmt::format_to(_insert_stmt_buffer, "{}",
- *reinterpret_cast<const __int128*>(item));
- break;
- }
- default: {
- return Status::InternalError("can't convert this type to mysql type. type = {}",
- output_vexpr_ctxs[j]->root()->type().type);
- }
- }
+ RETURN_IF_ERROR(convert_column_data(
+ column_ptr, type_ptr, output_vexpr_ctxs[j]->root()->type(), i, table_type));
}
if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) {
@@ -319,4 +203,161 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block*
return Status::OK();
}
+Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_ptr,
+ const vectorized::DataTypePtr& type_ptr,
+ const TypeDescriptor& type, int row,
+ TOdbcTableType::type table_type) {
+ auto extra_convert_func = [&](const std::string_view& str, const bool& is_date) -> void {
+ if (table_type != TOdbcTableType::ORACLE) {
+ fmt::format_to(_insert_stmt_buffer, "\"{}\"", str);
+ } else {
+ //if is ORACLE and date type, insert into need convert
+ if (is_date) {
+ fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd')", str);
+ } else {
+ fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd hh24:mi:ss')", str);
+ }
+ }
+ };
+ const vectorized::IColumn* column = column_ptr;
+ if (type_ptr->is_nullable()) {
+ auto nullable_column = assert_cast<const vectorized::ColumnNullable*>(column_ptr.get());
+ if (nullable_column->is_null_at(row)) {
+ fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
+ return Status::OK();
+ }
+ column = nullable_column->get_nested_column_ptr().get();
+ } else {
+ column = column_ptr;
+ }
+ auto [item, size] = column->get_data_at(row);
+ switch (type.type) {
+ case TYPE_BOOLEAN:
+ case TYPE_TINYINT:
+ fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int8_t*>(item));
+ break;
+ case TYPE_SMALLINT:
+ fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int16_t*>(item));
+ break;
+ case TYPE_INT:
+ fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int32_t*>(item));
+ break;
+ case TYPE_BIGINT:
+ fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int64_t*>(item));
+ break;
+ case TYPE_FLOAT:
+ fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const float*>(item));
+ break;
+ case TYPE_DOUBLE:
+ fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const double*>(item));
+ break;
+ case TYPE_DATE: {
+ vectorized::VecDateTimeValue value =
+ binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(*(int64_t*)item);
+
+ char buf[64];
+ char* pos = value.to_string(buf);
+ std::string_view str(buf, pos - buf - 1);
+ extra_convert_func(str, true);
+ break;
+ }
+ case TYPE_DATETIME: {
+ vectorized::VecDateTimeValue value =
+ binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(*(int64_t*)item);
+
+ char buf[64];
+ char* pos = value.to_string(buf);
+ std::string_view str(buf, pos - buf - 1);
+ extra_convert_func(str, false);
+ break;
+ }
+ case TYPE_DATEV2: {
+ vectorized::DateV2Value<vectorized::DateV2ValueType> value =
+ binary_cast<uint32_t, doris::vectorized::DateV2Value<vectorized::DateV2ValueType>>(
+ *(int32_t*)item);
+
+ char buf[64];
+ char* pos = value.to_string(buf);
+ std::string str(buf, pos - buf - 1);
+ extra_convert_func(str, true);
+ break;
+ }
+ case TYPE_DATETIMEV2: {
+ vectorized::DateV2Value<vectorized::DateTimeV2ValueType> value =
+ binary_cast<uint64_t,
+ doris::vectorized::DateV2Value<vectorized::DateTimeV2ValueType>>(
+ *(int64_t*)item);
+
+ char buf[64];
+ char* pos = value.to_string(buf, type.scale);
+ std::string str(buf, pos - buf - 1);
+ extra_convert_func(str, false);
+ break;
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ // here need check the ' is used, now for pg array string must be "
+ fmt::format_to(_insert_stmt_buffer, "\"{}\"", fmt::basic_string_view(item, size));
+ break;
+ }
+ case TYPE_ARRAY: {
+ auto& arr_nested = reinterpret_cast<const vectorized::ColumnArray*>(column)->get_data_ptr();
+ auto& arr_offset = reinterpret_cast<const vectorized::ColumnArray*>(column)->get_offsets();
+ auto array_type = remove_nullable(type_ptr);
+ auto nested_type =
+ reinterpret_cast<const vectorized::DataTypeArray&>(*array_type).get_nested_type();
+
+ //for doris、CK insert into ---> []
+ //for PG insert into ---> '{}'
+ if (table_type == TOdbcTableType::POSTGRESQL) {
+ fmt::format_to(_insert_stmt_buffer, "{}", "'{");
+ } else if (table_type == TOdbcTableType::CLICKHOUSE ||
+ table_type == TOdbcTableType::MYSQL) {
+ fmt::format_to(_insert_stmt_buffer, "{}", "[");
+ }
+ bool first_value = true;
+ for (auto idx = arr_offset[row - 1]; idx < arr_offset[row]; ++idx) {
+ if (first_value == false) {
+ fmt::format_to(_insert_stmt_buffer, "{}", ", ");
+ }
+ if (arr_nested->is_null_at(idx)) {
+ fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
+ } else {
+ RETURN_IF_ERROR(convert_column_data(arr_nested, nested_type, type.children[0], idx,
+ table_type));
+ }
+ first_value = false;
+ }
+ if (table_type == TOdbcTableType::POSTGRESQL) {
+ fmt::format_to(_insert_stmt_buffer, "{}", "}'");
+ } else if (table_type == TOdbcTableType::CLICKHOUSE ||
+ table_type == TOdbcTableType::MYSQL) {
+ fmt::format_to(_insert_stmt_buffer, "{}", "]");
+ }
+ break;
+ }
+ case TYPE_DECIMALV2: {
+ DecimalV2Value value = *(DecimalV2Value*)(item);
+ fmt::format_to(_insert_stmt_buffer, "{}", value.to_string());
+ break;
+ }
+ case TYPE_DECIMAL32:
+ case TYPE_DECIMAL64:
+ case TYPE_DECIMAL128I: {
+ auto val = type_ptr->to_string(*column, row);
+ fmt::format_to(_insert_stmt_buffer, "{}", val);
+ break;
+ }
+ case TYPE_LARGEINT: {
+ fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const __int128*>(item));
+ break;
+ }
+ default: {
+ return Status::InternalError("can't convert this type to mysql type. type = {}",
+ type.debug_string());
+ }
+ }
+ return Status::OK();
+}
} // namespace doris
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index 3fa9f5f5b1..f051776b49 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -18,6 +18,7 @@
#pragma once
#include <fmt/format.h>
+#include <gen_cpp/Types_types.h>
#include <boost/format.hpp>
#include <cstdlib>
@@ -58,12 +59,16 @@ public:
Status append(const std::string& table_name, vectorized::Block* block,
const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
uint32_t start_send_row, uint32_t* num_rows_sent,
- bool need_extra_convert = false);
+ TOdbcTableType::type table_type = TOdbcTableType::MYSQL);
void init_profile(RuntimeProfile*);
std::u16string utf8_to_u16string(const char* first, const char* last);
+ Status convert_column_data(const vectorized::ColumnPtr& column_ptr,
+ const vectorized::DataTypePtr& type_ptr, const TypeDescriptor& type,
+ int row, TOdbcTableType::type table_type);
+
virtual Status close() { return Status::OK(); }
protected:
diff --git a/be/src/vec/core/column_with_type_and_name.cpp b/be/src/vec/core/column_with_type_and_name.cpp
index e141efe8f3..e196935bff 100644
--- a/be/src/vec/core/column_with_type_and_name.cpp
+++ b/be/src/vec/core/column_with_type_and_name.cpp
@@ -50,7 +50,7 @@ void ColumnWithTypeAndName::dump_structure(std::ostream& out) const {
out << name;
if (type)
- out << " ";
+ out << " " << type->get_name();
else
out << " nullptr";
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index cbb9588bbd..edfb843733 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -125,7 +125,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
}
}
- RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, columns, state->batch_size()));
+ RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, columns, block, state->batch_size()));
if (_jdbc_eos == true) {
if (block->rows() == 0) {
diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp
index f2302f3baa..ab7fef7b5b 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -25,8 +25,11 @@
#include "runtime/define_primitive_type.h"
#include "runtime/user_function_cache.h"
#include "util/jni-util.h"
+#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
-#include "vec/exprs/vexpr.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_string.h"
+#include "vec/functions/simple_function_factory.h"
namespace doris {
namespace vectorized {
@@ -36,6 +39,8 @@ const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I";
const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;";
const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;";
+const char* JDBC_EXECUTOR_GET_ARR_LIST_SIGNATURE = "(Ljava/lang/Object;)Ljava/util/List;";
+const char* JDBC_EXECUTOR_GET_ARR_TYPE_SIGNATURE = "()I";
const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
const char* JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE = "(Ljava/lang/Object;)J";
const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J";
@@ -195,7 +200,7 @@ Status JdbcConnector::_check_column_type() {
env->CallObjectMethod(type_lists, _executor_get_list_id, materialized_column_index);
const std::string& type_str = _jobject_to_string(env, column_type);
- RETURN_IF_ERROR(_check_type(slot_desc, type_str));
+ RETURN_IF_ERROR(_check_type(slot_desc, type_str, column_index));
env->DeleteLocalRef(column_type);
materialized_column_index++;
}
@@ -222,7 +227,8 @@ DATETIME java.sql.Timestamp java.sql.Timestamp java.sql.Tim
NOTE: because oracle always use number(p,s) to create all numerical type, so it's java type maybe java.math.BigDecimal
*/
-Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& type_str) {
+Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& type_str,
+ int column_index) {
const std::string error_msg = fmt::format(
"Fail to convert jdbc type of {} to doris type {} on column: {}. You need to "
"check this column type between external table and doris table.",
@@ -289,6 +295,30 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string&
}
break;
}
+ case TYPE_ARRAY: {
+ if (type_str != "java.sql.Array" && type_str != "java.lang.String") {
+ return Status::InternalError(error_msg);
+ }
+ if (!slot_desc->type().children[0].children.empty()) {
+ return Status::InternalError("Now doris not support nested array type in array {}.",
+ slot_desc->type().debug_string());
+ }
+ // when type is array, except pd database, others use string cast array
+ if (_conn_param.table_type != TOdbcTableType::POSTGRESQL) {
+ _need_cast_array_type = true;
+ _map_column_idx_to_cast_idx[column_index] = _input_array_string_types.size();
+ if (slot_desc->is_nullable()) {
+ _input_array_string_types.push_back(
+ make_nullable(std::make_shared<DataTypeString>()));
+ } else {
+ _input_array_string_types.push_back(std::make_shared<DataTypeString>());
+ }
+ str_array_cols.push_back(
+ _input_array_string_types[_map_column_idx_to_cast_idx[column_index]]
+ ->create_column());
+ }
+ break;
+ }
default: {
return Status::InternalError(error_msg);
}
@@ -296,7 +326,8 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string&
return Status::OK();
}
-Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns, int batch_size) {
+Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns, Block* block,
+ int batch_size) {
if (!_is_open) {
return Status::InternalError("get_next before open of jdbc connector.");
}
@@ -322,17 +353,23 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
if (!slot_desc->is_materialized()) {
continue;
}
+ const std::string& column_name = slot_desc->col_name();
jobject column_data =
env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index);
jint num_rows = env->CallIntMethod(column_data, _executor_get_list_size_id);
for (int row = 0; row < num_rows; ++row) {
jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row);
- _convert_column_data(env, cur_data, slot_desc, columns[column_index].get());
+ RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc,
+ columns[column_index].get(), column_index,
+ column_name));
env->DeleteLocalRef(cur_data);
}
env->DeleteLocalRef(column_data);
-
+ //here need to cast string to array type
+ if (_need_cast_array_type && slot_desc->type().is_array_type()) {
+ _cast_string_to_array(slot_desc, block, column_index, num_rows);
+ }
materialized_column_index++;
}
// All Java objects returned by JNI functions are local references.
@@ -385,25 +422,43 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames",
JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "getArrayColumnData",
+ JDBC_EXECUTOR_GET_ARR_LIST_SIGNATURE, _executor_get_arr_list_id));
+ RETURN_IF_ERROR(register_id(_executor_clazz, "getBaseTypeInt",
+ JDBC_EXECUTOR_GET_ARR_TYPE_SIGNATURE, _executor_get_arr_type_id));
+
return Status::OK();
}
Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
const SlotDescriptor* slot_desc,
- vectorized::IColumn* column_ptr) {
+ vectorized::IColumn* column_ptr, int column_index,
+ std::string_view column_name) {
vectorized::IColumn* col_ptr = column_ptr;
if (true == slot_desc->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
if (jobj == nullptr) {
nullable_column->insert_data(nullptr, 0);
+ if (_need_cast_array_type && slot_desc->type().type == TYPE_ARRAY) {
+ reinterpret_cast<vectorized::ColumnNullable*>(
+ str_array_cols[_map_column_idx_to_cast_idx[column_index]].get())
+ ->insert_data(nullptr, 0);
+ }
return Status::OK();
} else {
nullable_column->get_null_map_data().push_back(0);
col_ptr = &nullable_column->get_nested_column();
}
}
+ RETURN_IF_ERROR(
+ _insert_column_data(env, jobj, slot_desc->type(), col_ptr, column_index, column_name));
+ return Status::OK();
+}
- switch (slot_desc->type().type) {
+Status JdbcConnector::_insert_column_data(JNIEnv* env, jobject jobj, const TypeDescriptor& type,
+ vectorized::IColumn* col_ptr, int column_index,
+ std::string_view column_name) {
+ switch (type.type) {
#define M(TYPE, CPP_TYPE, COLUMN_TYPE) \
case TYPE: { \
CPP_TYPE num = _jobject_to_##CPP_TYPE(env, jobj); \
@@ -468,8 +523,7 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
std::string data = _jobject_to_string(env, jobj);
StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
const Int32 decimal_slot = StringParser::string_to_decimal<Int32>(
- data.c_str(), data.length(), slot_desc->type().precision, slot_desc->type().scale,
- &result);
+ data.c_str(), data.length(), type.precision, type.scale, &result);
reinterpret_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&decimal_slot), 0);
break;
@@ -478,8 +532,7 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
std::string data = _jobject_to_string(env, jobj);
StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
const Int64 decimal_slot = StringParser::string_to_decimal<Int64>(
- data.c_str(), data.length(), slot_desc->type().precision, slot_desc->type().scale,
- &result);
+ data.c_str(), data.length(), type.precision, type.scale, &result);
reinterpret_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&decimal_slot), 0);
break;
@@ -488,22 +541,114 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
std::string data = _jobject_to_string(env, jobj);
StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
const Int128 decimal_slot = StringParser::string_to_decimal<Int128>(
- data.c_str(), data.length(), slot_desc->type().precision, slot_desc->type().scale,
- &result);
+ data.c_str(), data.length(), type.precision, type.scale, &result);
reinterpret_cast<vectorized::ColumnDecimal128I*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&decimal_slot), 0);
break;
}
+ case TYPE_ARRAY: {
+ if (_need_cast_array_type) {
+ // read array data is a big string: [1,2,3], need cast it by self
+ std::string data = _jobject_to_string(env, jobj);
+ str_array_cols[_map_column_idx_to_cast_idx[column_index]]->insert_data(data.c_str(),
+ data.length());
+ } else {
+ //POSTGRESQL read array is object[], so could get data by index
+ jobject arr_lists = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz,
+ _executor_get_arr_list_id, jobj);
+ jint arr_type = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz,
+ _executor_get_arr_type_id);
+ //here type check is maybe no needed,more checks affect performance
+ if (_arr_jdbc_map[arr_type] != type.children[0].type) {
+ const std::string& error_msg = fmt::format(
+ "Fail to convert jdbc value to array type of {} on column: {}, could check "
+ "this column type between external table and doris table. {}.{} ",
+ type.children[0].debug_string(), column_name, _arr_jdbc_map[arr_type],
+ arr_type);
+ return Status::InternalError(std::string(error_msg));
+ }
+ jint num_rows = env->CallIntMethod(arr_lists, _executor_get_list_size_id);
+ RETURN_IF_ERROR(_insert_arr_column_data(env, arr_lists, type.children[0], num_rows,
+ col_ptr, column_index, column_name));
+ env->DeleteLocalRef(arr_lists);
+ }
+ break;
+ }
default: {
- std::string error_msg =
- fmt::format("Fail to convert jdbc value to {} on column: {}.",
- slot_desc->type().debug_string(), slot_desc->col_name());
+ const std::string& error_msg = fmt::format(
+ "Fail to convert jdbc value to {} on column: {}, could check this column type "
+ "between external table and doris table.",
+ type.debug_string(), column_name);
return Status::InternalError(std::string(error_msg));
}
}
return Status::OK();
}
+Status JdbcConnector::_insert_arr_column_data(JNIEnv* env, jobject arr_lists,
+ const TypeDescriptor& type, int nums,
+ vectorized::IColumn* arr_column_ptr, int column_index,
+ std::string_view column_name) {
+ auto& arr_nested = reinterpret_cast<vectorized::ColumnArray*>(arr_column_ptr)->get_data();
+ vectorized::IColumn* col_ptr =
+ reinterpret_cast<vectorized::ColumnNullable&>(arr_nested).get_nested_column_ptr();
+ auto& nullmap_data =
+ reinterpret_cast<vectorized::ColumnNullable&>(arr_nested).get_null_map_data();
+ for (int i = 0; i < nums; ++i) {
+ jobject cur_data = env->CallObjectMethod(arr_lists, _executor_get_list_id, i);
+ if (cur_data == nullptr) {
+ arr_nested.insert_default();
+ continue;
+ } else {
+ nullmap_data.push_back(0);
+ }
+ RETURN_IF_ERROR(
+ _insert_column_data(env, cur_data, type, col_ptr, column_index, column_name));
+ env->DeleteLocalRef(cur_data);
+ }
+ auto old_size =
+ reinterpret_cast<vectorized::ColumnArray*>(arr_column_ptr)->get_offsets().back();
+ reinterpret_cast<vectorized::ColumnArray*>(arr_column_ptr)
+ ->get_offsets()
+ .push_back(nums + old_size);
+ return Status::OK();
+}
+
+Status JdbcConnector::_cast_string_to_array(const SlotDescriptor* slot_desc, Block* block,
+ int column_index, int rows) {
+ DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
+ std::string _target_data_type_name = DataTypeFactory::instance().get(_target_data_type);
+ DataTypePtr _cast_param_data_type = std::make_shared<DataTypeString>();
+ ColumnPtr _cast_param = _cast_param_data_type->create_column_const(1, _target_data_type_name);
+
+ ColumnsWithTypeAndName argument_template;
+ argument_template.reserve(2);
+ argument_template.emplace_back(
+ std::move(str_array_cols[_map_column_idx_to_cast_idx[column_index]]),
+ _input_array_string_types[_map_column_idx_to_cast_idx[column_index]],
+ "java.sql.String");
+ argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
+ FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
+ "CAST", argument_template, make_nullable(_target_data_type));
+
+ Block cast_block(argument_template);
+ int result_idx = cast_block.columns();
+ cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
+ func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows);
+
+ auto res_col = cast_block.get_by_position(result_idx).column;
+ if (_target_data_type->is_nullable()) {
+ block->replace_by_position(column_index, res_col);
+ } else {
+ auto nested_ptr = reinterpret_cast<const vectorized::ColumnNullable*>(res_col.get())
+ ->get_nested_column_ptr();
+ block->replace_by_position(column_index, nested_ptr);
+ }
+ str_array_cols[_map_column_idx_to_cast_idx[column_index]] =
+ _input_array_string_types[_map_column_idx_to_cast_idx[column_index]]->create_column();
+ return Status::OK();
+}
+
Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt,
const fmt::memory_buffer& insert_stmt_buffer) {
SCOPED_TIMER(_result_send_timer);
diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h
index 84ca17e02c..0d51fd278c 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -19,8 +19,12 @@
#include <jni.h>
+#include <string_view>
+
#include "common/status.h"
#include "exec/table_connector.h"
+#include "runtime/define_primitive_type.h"
+#include "vec/data_types/data_type.h"
namespace doris {
namespace vectorized {
@@ -51,7 +55,8 @@ public:
Status exec_write_sql(const std::u16string& insert_stmt,
const fmt::memory_buffer& insert_stmt_buffer) override;
- Status get_next(bool* eos, std::vector<MutableColumnPtr>& columns, int batch_size);
+ Status get_next(bool* eos, std::vector<MutableColumnPtr>& columns, Block* block,
+ int batch_size);
// use in JDBC transaction
Status begin_trans() override; // should be call after connect and before query or init_to_write
@@ -63,14 +68,28 @@ public:
private:
Status _register_func_id(JNIEnv* env);
Status _check_column_type();
- Status _check_type(SlotDescriptor*, const std::string& type_str);
+ Status _check_type(SlotDescriptor*, const std::string& type_str, int column_index);
Status _convert_column_data(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc,
- vectorized::IColumn* column_ptr);
+ vectorized::IColumn* column_ptr, int column_index,
+ std::string_view column_name);
+ Status _insert_column_data(JNIEnv* env, jobject jobj, const TypeDescriptor& type,
+ vectorized::IColumn* column_ptr, int column_index,
+ std::string_view column_name);
+ Status _insert_arr_column_data(JNIEnv* env, jobject jobj, const TypeDescriptor& type, int nums,
+ vectorized::IColumn* column_ptr, int column_index,
+ std::string_view column_name);
std::string _jobject_to_string(JNIEnv* env, jobject jobj);
int64_t _jobject_to_date(JNIEnv* env, jobject jobj);
int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj);
+ Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index,
+ int rows);
const JdbcConnectorParam& _conn_param;
+ //java.sql.Types: https://docs.oracle.com/javase/7/docs/api/constant-values.html#java.sql.Types.INTEGER
+ std::map<int, PrimitiveType> _arr_jdbc_map {
+ {-7, TYPE_BOOLEAN}, {-6, TYPE_TINYINT}, {5, TYPE_SMALLINT}, {4, TYPE_INT},
+ {-5, TYPE_BIGINT}, {12, TYPE_STRING}, {7, TYPE_FLOAT}, {8, TYPE_DOUBLE},
+ {91, TYPE_DATE}, {93, TYPE_DATETIME}, {2, TYPE_DECIMALV2}};
bool _closed;
jclass _executor_clazz;
jclass _executor_list_clazz;
@@ -83,6 +102,8 @@ private:
jmethodID _executor_has_next_id;
jmethodID _executor_get_blocks_id;
jmethodID _executor_get_types_id;
+ jmethodID _executor_get_arr_list_id;
+ jmethodID _executor_get_arr_type_id;
jmethodID _executor_close_id;
jmethodID _executor_get_list_id;
jmethodID _executor_get_list_size_id;
@@ -93,6 +114,11 @@ private:
jmethodID _executor_begin_trans_id;
jmethodID _executor_finish_trans_id;
jmethodID _executor_abort_trans_id;
+ bool _need_cast_array_type;
+ std::map<int, int> _map_column_idx_to_cast_idx;
+ std::vector<DataTypePtr> _input_array_string_types;
+ std::vector<MutableColumnPtr>
+ str_array_cols; // for array type to save data like big string [1,2,3]
#define FUNC_VARI_DECLARE(RETURN_TYPE) \
RETURN_TYPE _jobject_to_##RETURN_TYPE(JNIEnv* env, jobject jobj); \
diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp b/be/src/vec/sink/vjdbc_table_sink.cpp
index 0d9e457cff..dba302a52b 100644
--- a/be/src/vec/sink/vjdbc_table_sink.cpp
+++ b/be/src/vec/sink/vjdbc_table_sink.cpp
@@ -44,9 +44,9 @@ Status VJdbcTableSink::init(const TDataSink& t_sink) {
_jdbc_param.driver_path = t_jdbc_sink.jdbc_table.jdbc_driver_url;
_jdbc_param.driver_checksum = t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
_jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name;
+ _jdbc_param.table_type = t_jdbc_sink.table_type;
_table_name = t_jdbc_sink.jdbc_table.jdbc_table_name;
_use_transaction = t_jdbc_sink.use_transaction;
- _need_extra_convert = (t_jdbc_sink.table_type == TOdbcTableType::ORACLE);
return Status::OK();
}
@@ -81,7 +81,7 @@ Status VJdbcTableSink::send(RuntimeState* state, Block* block) {
uint32_t num_row_sent = 0;
while (start_send_row < output_block.rows()) {
RETURN_IF_ERROR(_writer->append(_table_name, &output_block, _output_vexpr_ctxs,
- start_send_row, &num_row_sent, _need_extra_convert));
+ start_send_row, &num_row_sent, _jdbc_param.table_type));
start_send_row += num_row_sent;
num_row_sent = 0;
}
diff --git a/be/src/vec/sink/vjdbc_table_sink.h b/be/src/vec/sink/vjdbc_table_sink.h
index 52238942a2..d6e91ef0b7 100644
--- a/be/src/vec/sink/vjdbc_table_sink.h
+++ b/be/src/vec/sink/vjdbc_table_sink.h
@@ -39,8 +39,6 @@ public:
private:
JdbcConnectorParam _jdbc_param;
std::unique_ptr<JdbcConnector> _writer;
- //if is ORACLE and date type, insert into need convert
- bool _need_extra_convert = false;
};
} // namespace vectorized
} // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
index 36b556c623..66670db2f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
@@ -266,7 +266,7 @@ public class ColumnDef {
defaultValue = DefaultValue.BITMAP_EMPTY_DEFAULT_VALUE;
}
- if (type.getPrimitiveType() == PrimitiveType.ARRAY) {
+ if (type.getPrimitiveType() == PrimitiveType.ARRAY && isOlap) {
if (isKey()) {
throw new AnalysisException("Array can only be used in the non-key column of"
+ " the duplicate table at present.");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index b09ea77ff3..b7994b8e3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -390,7 +390,7 @@ public class CreateTableStmt extends DdlStmt {
for (ColumnDef columnDef : columnDefs) {
columnDef.analyze(engineName.equals("olap"));
- if (columnDef.getType().isArrayType()) {
+ if (columnDef.getType().isArrayType() && engineName.equals("olap")) {
if (columnDef.getAggregateType() != null && columnDef.getAggregateType() != AggregateType.NONE) {
throw new AnalysisException("Array column can't support aggregation "
+ columnDef.getAggregateType());
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index 9539f65aba..d48a679a6c 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -42,6 +42,7 @@ import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
public class JdbcExecutor {
@@ -54,6 +55,7 @@ public class JdbcExecutor {
// Use HikariDataSource to help us manage the JDBC connections.
private HikariDataSource dataSource = null;
private List<String> resultColumnTypeNames = null;
+ private int baseTypeInt = 0;
public JdbcExecutor(byte[] thriftParams) throws Exception {
TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -124,6 +126,20 @@ public class JdbcExecutor {
}
}
+ public List<Object> getArrayColumnData(Object object) throws UdfRuntimeException {
+ try {
+ java.sql.Array obj = (java.sql.Array) object;
+ baseTypeInt = obj.getBaseType();
+ return Arrays.asList((Object[]) obj.getArray());
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("JDBC executor getArrayColumnData has error: ", e);
+ }
+ }
+
+ public int getBaseTypeInt() {
+ return baseTypeInt;
+ }
+
public void openTrans() throws UdfRuntimeException {
try {
if (conn != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org