You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/08 00:52:05 UTC

[doris] branch branch-1.2-lts updated: [vectorized](jdbc) support array type in jdbc external table (#15303)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 68fbf1709b [vectorized](jdbc) support array type in jdbc external table (#15303)
68fbf1709b is described below

commit 68fbf1709bb445afa3445ec2cae0929e07615920
Author: zhangstar333 <87...@users.noreply.github.com>
AuthorDate: Fri Dec 30 00:29:08 2022 +0800

    [vectorized](jdbc) support array type in jdbc external table (#15303)
---
 be/src/exec/table_connector.cpp                    | 291 ++++++++++++---------
 be/src/exec/table_connector.h                      |   7 +-
 be/src/vec/core/column_with_type_and_name.cpp      |   2 +-
 be/src/vec/exec/scan/new_jdbc_scanner.cpp          |   2 +-
 be/src/vec/exec/vjdbc_connector.cpp                | 179 +++++++++++--
 be/src/vec/exec/vjdbc_connector.h                  |  32 ++-
 be/src/vec/sink/vjdbc_table_sink.cpp               |   4 +-
 be/src/vec/sink/vjdbc_table_sink.h                 |   2 -
 .../java/org/apache/doris/analysis/ColumnDef.java  |   2 +-
 .../org/apache/doris/analysis/CreateTableStmt.java |   2 +-
 .../java/org/apache/doris/udf/JdbcExecutor.java    |  16 ++
 11 files changed, 385 insertions(+), 154 deletions(-)

diff --git a/be/src/exec/table_connector.cpp b/be/src/exec/table_connector.cpp
index fa725832d3..0e07edd48d 100644
--- a/be/src/exec/table_connector.cpp
+++ b/be/src/exec/table_connector.cpp
@@ -17,12 +17,18 @@
 
 #include "exec/table_connector.h"
 
+#include <gen_cpp/Types_types.h>
+
 #include <codecvt>
 
 #include "exprs/expr.h"
+#include "runtime/define_primitive_type.h"
 #include "runtime/primitive_type.h"
 #include "util/mysql_global.h"
+#include "vec/columns/column_array.h"
 #include "vec/core/block.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
 
@@ -157,26 +163,13 @@ Status TableConnector::append(const std::string& table_name, RowBatch* batch,
 Status TableConnector::append(const std::string& table_name, vectorized::Block* block,
                               const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs,
                               uint32_t start_send_row, uint32_t* num_rows_sent,
-                              bool need_extra_convert) {
+                              TOdbcTableType::type table_type) {
     _insert_stmt_buffer.clear();
     std::u16string insert_stmt;
     {
         SCOPED_TIMER(_convert_tuple_timer);
         fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", table_name);
 
-        auto extra_convert_func = [&](const std::string_view& str, const bool& is_date) -> void {
-            if (!need_extra_convert) {
-                fmt::format_to(_insert_stmt_buffer, "'{}'", str);
-            } else {
-                if (is_date) {
-                    fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd')", str);
-                } else {
-                    fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd hh24:mi:ss')",
-                                   str);
-                }
-            }
-        };
-
         int num_rows = block->rows();
         int num_columns = block->columns();
         for (int i = start_send_row; i < num_rows; ++i) {
@@ -189,117 +182,8 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block*
                 }
                 auto& column_ptr = block->get_by_position(j).column;
                 auto& type_ptr = block->get_by_position(j).type;
-                vectorized::ColumnPtr column;
-                if (type_ptr->is_nullable()) {
-                    column = assert_cast<const vectorized::ColumnNullable&>(*column_ptr)
-                                     .get_nested_column_ptr();
-                    if (column_ptr->is_null_at(i)) {
-                        fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
-                        continue;
-                    }
-                } else {
-                    column = column_ptr;
-                }
-                auto [item, size] = column->get_data_at(i);
-                switch (output_vexpr_ctxs[j]->root()->type().type) {
-                case TYPE_BOOLEAN:
-                case TYPE_TINYINT:
-                    fmt::format_to(_insert_stmt_buffer, "{}",
-                                   *reinterpret_cast<const int8_t*>(item));
-                    break;
-                case TYPE_SMALLINT:
-                    fmt::format_to(_insert_stmt_buffer, "{}",
-                                   *reinterpret_cast<const int16_t*>(item));
-                    break;
-                case TYPE_INT:
-                    fmt::format_to(_insert_stmt_buffer, "{}",
-                                   *reinterpret_cast<const int32_t*>(item));
-                    break;
-                case TYPE_BIGINT:
-                    fmt::format_to(_insert_stmt_buffer, "{}",
-                                   *reinterpret_cast<const int64_t*>(item));
-                    break;
-                case TYPE_FLOAT:
-                    fmt::format_to(_insert_stmt_buffer, "{}",
-                                   *reinterpret_cast<const float*>(item));
-                    break;
-                case TYPE_DOUBLE:
-                    fmt::format_to(_insert_stmt_buffer, "{}",
-                                   *reinterpret_cast<const double*>(item));
-                    break;
-                case TYPE_DATE: {
-                    vectorized::VecDateTimeValue value =
-                            binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(
-                                    *(int64_t*)item);
-
-                    char buf[64];
-                    char* pos = value.to_string(buf);
-                    std::string_view str(buf, pos - buf - 1);
-                    extra_convert_func(str, true);
-                    break;
-                }
-                case TYPE_DATETIME: {
-                    vectorized::VecDateTimeValue value =
-                            binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(
-                                    *(int64_t*)item);
-
-                    char buf[64];
-                    char* pos = value.to_string(buf);
-                    std::string_view str(buf, pos - buf - 1);
-                    extra_convert_func(str, false);
-                    break;
-                }
-                case TYPE_DATEV2: {
-                    vectorized::DateV2Value<vectorized::DateV2ValueType> value = binary_cast<
-                            uint32_t, doris::vectorized::DateV2Value<vectorized::DateV2ValueType>>(
-                            *(int32_t*)item);
-
-                    char buf[64];
-                    char* pos = value.to_string(buf);
-                    std::string str(buf, pos - buf - 1);
-                    extra_convert_func(str, true);
-                    break;
-                }
-                case TYPE_DATETIMEV2: {
-                    vectorized::DateV2Value<vectorized::DateTimeV2ValueType> value = binary_cast<
-                            uint64_t,
-                            doris::vectorized::DateV2Value<vectorized::DateTimeV2ValueType>>(
-                            *(int64_t*)item);
-
-                    char buf[64];
-                    char* pos = value.to_string(buf, output_vexpr_ctxs[i]->root()->type().scale);
-                    std::string str(buf, pos - buf - 1);
-                    extra_convert_func(str, false);
-                    break;
-                }
-                case TYPE_VARCHAR:
-                case TYPE_CHAR:
-                case TYPE_STRING: {
-                    fmt::format_to(_insert_stmt_buffer, "'{}'", fmt::basic_string_view(item, size));
-                    break;
-                }
-                case TYPE_DECIMALV2: {
-                    DecimalV2Value value = *(DecimalV2Value*)(item);
-                    fmt::format_to(_insert_stmt_buffer, "{}", value.to_string());
-                    break;
-                }
-                case TYPE_DECIMAL32:
-                case TYPE_DECIMAL64:
-                case TYPE_DECIMAL128I: {
-                    auto val = type_ptr->to_string(*column, i);
-                    fmt::format_to(_insert_stmt_buffer, "{}", val);
-                    break;
-                }
-                case TYPE_LARGEINT: {
-                    fmt::format_to(_insert_stmt_buffer, "{}",
-                                   *reinterpret_cast<const __int128*>(item));
-                    break;
-                }
-                default: {
-                    return Status::InternalError("can't convert this type to mysql type. type = {}",
-                                                 output_vexpr_ctxs[j]->root()->type().type);
-                }
-                }
+                RETURN_IF_ERROR(convert_column_data(
+                        column_ptr, type_ptr, output_vexpr_ctxs[j]->root()->type(), i, table_type));
             }
 
             if (i < num_rows - 1 && _insert_stmt_buffer.size() < INSERT_BUFFER_SIZE) {
@@ -319,4 +203,161 @@ Status TableConnector::append(const std::string& table_name, vectorized::Block*
     return Status::OK();
 }
 
+Status TableConnector::convert_column_data(const vectorized::ColumnPtr& column_ptr,
+                                           const vectorized::DataTypePtr& type_ptr,
+                                           const TypeDescriptor& type, int row,
+                                           TOdbcTableType::type table_type) {
+    auto extra_convert_func = [&](const std::string_view& str, const bool& is_date) -> void {
+        if (table_type != TOdbcTableType::ORACLE) {
+            fmt::format_to(_insert_stmt_buffer, "\"{}\"", str);
+        } else {
+            //if is ORACLE and date type, insert into need convert
+            if (is_date) {
+                fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd')", str);
+            } else {
+                fmt::format_to(_insert_stmt_buffer, "to_date('{}','yyyy-mm-dd hh24:mi:ss')", str);
+            }
+        }
+    };
+    const vectorized::IColumn* column = column_ptr;
+    if (type_ptr->is_nullable()) {
+        auto nullable_column = assert_cast<const vectorized::ColumnNullable*>(column_ptr.get());
+        if (nullable_column->is_null_at(row)) {
+            fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
+            return Status::OK();
+        }
+        column = nullable_column->get_nested_column_ptr().get();
+    } else {
+        column = column_ptr;
+    }
+    auto [item, size] = column->get_data_at(row);
+    switch (type.type) {
+    case TYPE_BOOLEAN:
+    case TYPE_TINYINT:
+        fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int8_t*>(item));
+        break;
+    case TYPE_SMALLINT:
+        fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int16_t*>(item));
+        break;
+    case TYPE_INT:
+        fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int32_t*>(item));
+        break;
+    case TYPE_BIGINT:
+        fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const int64_t*>(item));
+        break;
+    case TYPE_FLOAT:
+        fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const float*>(item));
+        break;
+    case TYPE_DOUBLE:
+        fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const double*>(item));
+        break;
+    case TYPE_DATE: {
+        vectorized::VecDateTimeValue value =
+                binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(*(int64_t*)item);
+
+        char buf[64];
+        char* pos = value.to_string(buf);
+        std::string_view str(buf, pos - buf - 1);
+        extra_convert_func(str, true);
+        break;
+    }
+    case TYPE_DATETIME: {
+        vectorized::VecDateTimeValue value =
+                binary_cast<int64_t, doris::vectorized::VecDateTimeValue>(*(int64_t*)item);
+
+        char buf[64];
+        char* pos = value.to_string(buf);
+        std::string_view str(buf, pos - buf - 1);
+        extra_convert_func(str, false);
+        break;
+    }
+    case TYPE_DATEV2: {
+        vectorized::DateV2Value<vectorized::DateV2ValueType> value =
+                binary_cast<uint32_t, doris::vectorized::DateV2Value<vectorized::DateV2ValueType>>(
+                        *(int32_t*)item);
+
+        char buf[64];
+        char* pos = value.to_string(buf);
+        std::string str(buf, pos - buf - 1);
+        extra_convert_func(str, true);
+        break;
+    }
+    case TYPE_DATETIMEV2: {
+        vectorized::DateV2Value<vectorized::DateTimeV2ValueType> value =
+                binary_cast<uint64_t,
+                            doris::vectorized::DateV2Value<vectorized::DateTimeV2ValueType>>(
+                        *(int64_t*)item);
+
+        char buf[64];
+        char* pos = value.to_string(buf, type.scale);
+        std::string str(buf, pos - buf - 1);
+        extra_convert_func(str, false);
+        break;
+    }
+    case TYPE_VARCHAR:
+    case TYPE_CHAR:
+    case TYPE_STRING: {
+        // here need check the ' is used, now for pg array string must be "
+        fmt::format_to(_insert_stmt_buffer, "\"{}\"", fmt::basic_string_view(item, size));
+        break;
+    }
+    case TYPE_ARRAY: {
+        auto& arr_nested = reinterpret_cast<const vectorized::ColumnArray*>(column)->get_data_ptr();
+        auto& arr_offset = reinterpret_cast<const vectorized::ColumnArray*>(column)->get_offsets();
+        auto array_type = remove_nullable(type_ptr);
+        auto nested_type =
+                reinterpret_cast<const vectorized::DataTypeArray&>(*array_type).get_nested_type();
+
+        //for doris、CK insert into --->  []
+        //for PG        insert into ---> '{}'
+        if (table_type == TOdbcTableType::POSTGRESQL) {
+            fmt::format_to(_insert_stmt_buffer, "{}", "'{");
+        } else if (table_type == TOdbcTableType::CLICKHOUSE ||
+                   table_type == TOdbcTableType::MYSQL) {
+            fmt::format_to(_insert_stmt_buffer, "{}", "[");
+        }
+        bool first_value = true;
+        for (auto idx = arr_offset[row - 1]; idx < arr_offset[row]; ++idx) {
+            if (first_value == false) {
+                fmt::format_to(_insert_stmt_buffer, "{}", ", ");
+            }
+            if (arr_nested->is_null_at(idx)) {
+                fmt::format_to(_insert_stmt_buffer, "{}", "NULL");
+            } else {
+                RETURN_IF_ERROR(convert_column_data(arr_nested, nested_type, type.children[0], idx,
+                                                    table_type));
+            }
+            first_value = false;
+        }
+        if (table_type == TOdbcTableType::POSTGRESQL) {
+            fmt::format_to(_insert_stmt_buffer, "{}", "}'");
+        } else if (table_type == TOdbcTableType::CLICKHOUSE ||
+                   table_type == TOdbcTableType::MYSQL) {
+            fmt::format_to(_insert_stmt_buffer, "{}", "]");
+        }
+        break;
+    }
+    case TYPE_DECIMALV2: {
+        DecimalV2Value value = *(DecimalV2Value*)(item);
+        fmt::format_to(_insert_stmt_buffer, "{}", value.to_string());
+        break;
+    }
+    case TYPE_DECIMAL32:
+    case TYPE_DECIMAL64:
+    case TYPE_DECIMAL128I: {
+        auto val = type_ptr->to_string(*column, row);
+        fmt::format_to(_insert_stmt_buffer, "{}", val);
+        break;
+    }
+    case TYPE_LARGEINT: {
+        fmt::format_to(_insert_stmt_buffer, "{}", *reinterpret_cast<const __int128*>(item));
+        break;
+    }
+    default: {
+        return Status::InternalError("can't convert this type to mysql type. type = {}",
+                                     type.debug_string());
+    }
+    }
+    return Status::OK();
+}
 } // namespace doris
diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h
index 3fa9f5f5b1..f051776b49 100644
--- a/be/src/exec/table_connector.h
+++ b/be/src/exec/table_connector.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <fmt/format.h>
+#include <gen_cpp/Types_types.h>
 
 #include <boost/format.hpp>
 #include <cstdlib>
@@ -58,12 +59,16 @@ public:
     Status append(const std::string& table_name, vectorized::Block* block,
                   const std::vector<vectorized::VExprContext*>& _output_vexpr_ctxs,
                   uint32_t start_send_row, uint32_t* num_rows_sent,
-                  bool need_extra_convert = false);
+                  TOdbcTableType::type table_type = TOdbcTableType::MYSQL);
 
     void init_profile(RuntimeProfile*);
 
     std::u16string utf8_to_u16string(const char* first, const char* last);
 
+    Status convert_column_data(const vectorized::ColumnPtr& column_ptr,
+                               const vectorized::DataTypePtr& type_ptr, const TypeDescriptor& type,
+                               int row, TOdbcTableType::type table_type);
+
     virtual Status close() { return Status::OK(); }
 
 protected:
diff --git a/be/src/vec/core/column_with_type_and_name.cpp b/be/src/vec/core/column_with_type_and_name.cpp
index e141efe8f3..e196935bff 100644
--- a/be/src/vec/core/column_with_type_and_name.cpp
+++ b/be/src/vec/core/column_with_type_and_name.cpp
@@ -50,7 +50,7 @@ void ColumnWithTypeAndName::dump_structure(std::ostream& out) const {
     out << name;
 
     if (type)
-        out << " ";
+        out << " " << type->get_name();
     else
         out << " nullptr";
 
diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
index cbb9588bbd..edfb843733 100644
--- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp
+++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp
@@ -125,7 +125,7 @@ Status NewJdbcScanner::_get_block_impl(RuntimeState* state, Block* block, bool*
             }
         }
 
-        RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, columns, state->batch_size()));
+        RETURN_IF_ERROR(_jdbc_connector->get_next(&_jdbc_eos, columns, block, state->batch_size()));
 
         if (_jdbc_eos == true) {
             if (block->rows() == 0) {
diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp
index f2302f3baa..ab7fef7b5b 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -25,8 +25,11 @@
 #include "runtime/define_primitive_type.h"
 #include "runtime/user_function_cache.h"
 #include "util/jni-util.h"
+#include "vec/columns/column_array.h"
 #include "vec/columns/column_nullable.h"
-#include "vec/exprs/vexpr.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_string.h"
+#include "vec/functions/simple_function_factory.h"
 
 namespace doris {
 namespace vectorized {
@@ -36,6 +39,8 @@ const char* JDBC_EXECUTOR_WRITE_SIGNATURE = "(Ljava/lang/String;)I";
 const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z";
 const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;";
 const char* JDBC_EXECUTOR_GET_TYPES_SIGNATURE = "()Ljava/util/List;";
+const char* JDBC_EXECUTOR_GET_ARR_LIST_SIGNATURE = "(Ljava/lang/Object;)Ljava/util/List;";
+const char* JDBC_EXECUTOR_GET_ARR_TYPE_SIGNATURE = "()I";
 const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V";
 const char* JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE = "(Ljava/lang/Object;)J";
 const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J";
@@ -195,7 +200,7 @@ Status JdbcConnector::_check_column_type() {
                 env->CallObjectMethod(type_lists, _executor_get_list_id, materialized_column_index);
 
         const std::string& type_str = _jobject_to_string(env, column_type);
-        RETURN_IF_ERROR(_check_type(slot_desc, type_str));
+        RETURN_IF_ERROR(_check_type(slot_desc, type_str, column_index));
         env->DeleteLocalRef(column_type);
         materialized_column_index++;
     }
@@ -222,7 +227,8 @@ DATETIME     java.sql.Timestamp         java.sql.Timestamp          java.sql.Tim
 NOTE: because oracle always use number(p,s) to create all numerical type, so it's java type maybe java.math.BigDecimal
 */
 
-Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& type_str) {
+Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string& type_str,
+                                  int column_index) {
     const std::string error_msg = fmt::format(
             "Fail to convert jdbc type of {} to doris type {} on column: {}. You need to "
             "check this column type between external table and doris table.",
@@ -289,6 +295,30 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string&
         }
         break;
     }
+    case TYPE_ARRAY: {
+        if (type_str != "java.sql.Array" && type_str != "java.lang.String") {
+            return Status::InternalError(error_msg);
+        }
+        if (!slot_desc->type().children[0].children.empty()) {
+            return Status::InternalError("Now doris not support nested array type in array {}.",
+                                         slot_desc->type().debug_string());
+        }
+        // when type is array, except pd database, others use string cast array
+        if (_conn_param.table_type != TOdbcTableType::POSTGRESQL) {
+            _need_cast_array_type = true;
+            _map_column_idx_to_cast_idx[column_index] = _input_array_string_types.size();
+            if (slot_desc->is_nullable()) {
+                _input_array_string_types.push_back(
+                        make_nullable(std::make_shared<DataTypeString>()));
+            } else {
+                _input_array_string_types.push_back(std::make_shared<DataTypeString>());
+            }
+            str_array_cols.push_back(
+                    _input_array_string_types[_map_column_idx_to_cast_idx[column_index]]
+                            ->create_column());
+        }
+        break;
+    }
     default: {
         return Status::InternalError(error_msg);
     }
@@ -296,7 +326,8 @@ Status JdbcConnector::_check_type(SlotDescriptor* slot_desc, const std::string&
     return Status::OK();
 }
 
-Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns, int batch_size) {
+Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns, Block* block,
+                               int batch_size) {
     if (!_is_open) {
         return Status::InternalError("get_next before open of jdbc connector.");
     }
@@ -322,17 +353,23 @@ Status JdbcConnector::get_next(bool* eos, std::vector<MutableColumnPtr>& columns
         if (!slot_desc->is_materialized()) {
             continue;
         }
+        const std::string& column_name = slot_desc->col_name();
         jobject column_data =
                 env->CallObjectMethod(block_obj, _executor_get_list_id, materialized_column_index);
         jint num_rows = env->CallIntMethod(column_data, _executor_get_list_size_id);
 
         for (int row = 0; row < num_rows; ++row) {
             jobject cur_data = env->CallObjectMethod(column_data, _executor_get_list_id, row);
-            _convert_column_data(env, cur_data, slot_desc, columns[column_index].get());
+            RETURN_IF_ERROR(_convert_column_data(env, cur_data, slot_desc,
+                                                 columns[column_index].get(), column_index,
+                                                 column_name));
             env->DeleteLocalRef(cur_data);
         }
         env->DeleteLocalRef(column_data);
-
+        //here need to cast string to array type
+        if (_need_cast_array_type && slot_desc->type().is_array_type()) {
+            _cast_string_to_array(slot_desc, block, column_index, num_rows);
+        }
         materialized_column_index++;
     }
     // All Java objects returned by JNI functions are local references.
@@ -385,25 +422,43 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
                                 JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id));
     RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames",
                                 JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "getArrayColumnData",
+                                JDBC_EXECUTOR_GET_ARR_LIST_SIGNATURE, _executor_get_arr_list_id));
+    RETURN_IF_ERROR(register_id(_executor_clazz, "getBaseTypeInt",
+                                JDBC_EXECUTOR_GET_ARR_TYPE_SIGNATURE, _executor_get_arr_type_id));
+
     return Status::OK();
 }
 
 Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
                                            const SlotDescriptor* slot_desc,
-                                           vectorized::IColumn* column_ptr) {
+                                           vectorized::IColumn* column_ptr, int column_index,
+                                           std::string_view column_name) {
     vectorized::IColumn* col_ptr = column_ptr;
     if (true == slot_desc->is_nullable()) {
         auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(column_ptr);
         if (jobj == nullptr) {
             nullable_column->insert_data(nullptr, 0);
+            if (_need_cast_array_type && slot_desc->type().type == TYPE_ARRAY) {
+                reinterpret_cast<vectorized::ColumnNullable*>(
+                        str_array_cols[_map_column_idx_to_cast_idx[column_index]].get())
+                        ->insert_data(nullptr, 0);
+            }
             return Status::OK();
         } else {
             nullable_column->get_null_map_data().push_back(0);
             col_ptr = &nullable_column->get_nested_column();
         }
     }
+    RETURN_IF_ERROR(
+            _insert_column_data(env, jobj, slot_desc->type(), col_ptr, column_index, column_name));
+    return Status::OK();
+}
 
-    switch (slot_desc->type().type) {
+Status JdbcConnector::_insert_column_data(JNIEnv* env, jobject jobj, const TypeDescriptor& type,
+                                          vectorized::IColumn* col_ptr, int column_index,
+                                          std::string_view column_name) {
+    switch (type.type) {
 #define M(TYPE, CPP_TYPE, COLUMN_TYPE)                              \
     case TYPE: {                                                    \
         CPP_TYPE num = _jobject_to_##CPP_TYPE(env, jobj);           \
@@ -468,8 +523,7 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
         std::string data = _jobject_to_string(env, jobj);
         StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
         const Int32 decimal_slot = StringParser::string_to_decimal<Int32>(
-                data.c_str(), data.length(), slot_desc->type().precision, slot_desc->type().scale,
-                &result);
+                data.c_str(), data.length(), type.precision, type.scale, &result);
         reinterpret_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data(
                 reinterpret_cast<const char*>(&decimal_slot), 0);
         break;
@@ -478,8 +532,7 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
         std::string data = _jobject_to_string(env, jobj);
         StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
         const Int64 decimal_slot = StringParser::string_to_decimal<Int64>(
-                data.c_str(), data.length(), slot_desc->type().precision, slot_desc->type().scale,
-                &result);
+                data.c_str(), data.length(), type.precision, type.scale, &result);
         reinterpret_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data(
                 reinterpret_cast<const char*>(&decimal_slot), 0);
         break;
@@ -488,22 +541,114 @@ Status JdbcConnector::_convert_column_data(JNIEnv* env, jobject jobj,
         std::string data = _jobject_to_string(env, jobj);
         StringParser::ParseResult result = StringParser::PARSE_SUCCESS;
         const Int128 decimal_slot = StringParser::string_to_decimal<Int128>(
-                data.c_str(), data.length(), slot_desc->type().precision, slot_desc->type().scale,
-                &result);
+                data.c_str(), data.length(), type.precision, type.scale, &result);
         reinterpret_cast<vectorized::ColumnDecimal128I*>(col_ptr)->insert_data(
                 reinterpret_cast<const char*>(&decimal_slot), 0);
         break;
     }
+    case TYPE_ARRAY: {
+        if (_need_cast_array_type) {
+            // read array data is a big string: [1,2,3], need cast it by self
+            std::string data = _jobject_to_string(env, jobj);
+            str_array_cols[_map_column_idx_to_cast_idx[column_index]]->insert_data(data.c_str(),
+                                                                                   data.length());
+        } else {
+            //POSTGRESQL read array is object[], so could get data by index
+            jobject arr_lists = env->CallNonvirtualObjectMethod(_executor_obj, _executor_clazz,
+                                                                _executor_get_arr_list_id, jobj);
+            jint arr_type = env->CallNonvirtualIntMethod(_executor_obj, _executor_clazz,
+                                                         _executor_get_arr_type_id);
+            //here type check is maybe no needed,more checks affect performance
+            if (_arr_jdbc_map[arr_type] != type.children[0].type) {
+                const std::string& error_msg = fmt::format(
+                        "Fail to convert jdbc value to array type of {} on column: {}, could check "
+                        "this column type between external table and doris table. {}.{} ",
+                        type.children[0].debug_string(), column_name, _arr_jdbc_map[arr_type],
+                        arr_type);
+                return Status::InternalError(std::string(error_msg));
+            }
+            jint num_rows = env->CallIntMethod(arr_lists, _executor_get_list_size_id);
+            RETURN_IF_ERROR(_insert_arr_column_data(env, arr_lists, type.children[0], num_rows,
+                                                    col_ptr, column_index, column_name));
+            env->DeleteLocalRef(arr_lists);
+        }
+        break;
+    }
     default: {
-        std::string error_msg =
-                fmt::format("Fail to convert jdbc value to {} on column: {}.",
-                            slot_desc->type().debug_string(), slot_desc->col_name());
+        const std::string& error_msg = fmt::format(
+                "Fail to convert jdbc value to {} on column: {}, could check this column type "
+                "between external table and doris table.",
+                type.debug_string(), column_name);
         return Status::InternalError(std::string(error_msg));
     }
     }
     return Status::OK();
 }
 
+Status JdbcConnector::_insert_arr_column_data(JNIEnv* env, jobject arr_lists,
+                                              const TypeDescriptor& type, int nums,
+                                              vectorized::IColumn* arr_column_ptr, int column_index,
+                                              std::string_view column_name) {
+    auto& arr_nested = reinterpret_cast<vectorized::ColumnArray*>(arr_column_ptr)->get_data();
+    vectorized::IColumn* col_ptr =
+            reinterpret_cast<vectorized::ColumnNullable&>(arr_nested).get_nested_column_ptr();
+    auto& nullmap_data =
+            reinterpret_cast<vectorized::ColumnNullable&>(arr_nested).get_null_map_data();
+    for (int i = 0; i < nums; ++i) {
+        jobject cur_data = env->CallObjectMethod(arr_lists, _executor_get_list_id, i);
+        if (cur_data == nullptr) {
+            arr_nested.insert_default();
+            continue;
+        } else {
+            nullmap_data.push_back(0);
+        }
+        RETURN_IF_ERROR(
+                _insert_column_data(env, cur_data, type, col_ptr, column_index, column_name));
+        env->DeleteLocalRef(cur_data);
+    }
+    auto old_size =
+            reinterpret_cast<vectorized::ColumnArray*>(arr_column_ptr)->get_offsets().back();
+    reinterpret_cast<vectorized::ColumnArray*>(arr_column_ptr)
+            ->get_offsets()
+            .push_back(nums + old_size);
+    return Status::OK();
+}
+
+Status JdbcConnector::_cast_string_to_array(const SlotDescriptor* slot_desc, Block* block,
+                                            int column_index, int rows) {
+    DataTypePtr _target_data_type = slot_desc->get_data_type_ptr();
+    std::string _target_data_type_name = DataTypeFactory::instance().get(_target_data_type);
+    DataTypePtr _cast_param_data_type = std::make_shared<DataTypeString>();
+    ColumnPtr _cast_param = _cast_param_data_type->create_column_const(1, _target_data_type_name);
+
+    ColumnsWithTypeAndName argument_template;
+    argument_template.reserve(2);
+    argument_template.emplace_back(
+            std::move(str_array_cols[_map_column_idx_to_cast_idx[column_index]]),
+            _input_array_string_types[_map_column_idx_to_cast_idx[column_index]],
+            "java.sql.String");
+    argument_template.emplace_back(_cast_param, _cast_param_data_type, _target_data_type_name);
+    FunctionBasePtr func_cast = SimpleFunctionFactory::instance().get_function(
+            "CAST", argument_template, make_nullable(_target_data_type));
+
+    Block cast_block(argument_template);
+    int result_idx = cast_block.columns();
+    cast_block.insert({nullptr, make_nullable(_target_data_type), "cast_result"});
+    func_cast->execute(nullptr, cast_block, {0, 1}, result_idx, rows);
+
+    auto res_col = cast_block.get_by_position(result_idx).column;
+    if (_target_data_type->is_nullable()) {
+        block->replace_by_position(column_index, res_col);
+    } else {
+        auto nested_ptr = reinterpret_cast<const vectorized::ColumnNullable*>(res_col.get())
+                                  ->get_nested_column_ptr();
+        block->replace_by_position(column_index, nested_ptr);
+    }
+    str_array_cols[_map_column_idx_to_cast_idx[column_index]] =
+            _input_array_string_types[_map_column_idx_to_cast_idx[column_index]]->create_column();
+    return Status::OK();
+}
+
 Status JdbcConnector::exec_write_sql(const std::u16string& insert_stmt,
                                      const fmt::memory_buffer& insert_stmt_buffer) {
     SCOPED_TIMER(_result_send_timer);
diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h
index 84ca17e02c..0d51fd278c 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -19,8 +19,12 @@
 
 #include <jni.h>
 
+#include <string_view>
+
 #include "common/status.h"
 #include "exec/table_connector.h"
+#include "runtime/define_primitive_type.h"
+#include "vec/data_types/data_type.h"
 
 namespace doris {
 namespace vectorized {
@@ -51,7 +55,8 @@ public:
     Status exec_write_sql(const std::u16string& insert_stmt,
                           const fmt::memory_buffer& insert_stmt_buffer) override;
 
-    Status get_next(bool* eos, std::vector<MutableColumnPtr>& columns, int batch_size);
+    Status get_next(bool* eos, std::vector<MutableColumnPtr>& columns, Block* block,
+                    int batch_size);
 
     // use in JDBC transaction
     Status begin_trans() override; // should be call after connect and before query or init_to_write
@@ -63,14 +68,28 @@ public:
 private:
     Status _register_func_id(JNIEnv* env);
     Status _check_column_type();
-    Status _check_type(SlotDescriptor*, const std::string& type_str);
+    Status _check_type(SlotDescriptor*, const std::string& type_str, int column_index);
     Status _convert_column_data(JNIEnv* env, jobject jobj, const SlotDescriptor* slot_desc,
-                                vectorized::IColumn* column_ptr);
+                                vectorized::IColumn* column_ptr, int column_index,
+                                std::string_view column_name);
+    Status _insert_column_data(JNIEnv* env, jobject jobj, const TypeDescriptor& type,
+                               vectorized::IColumn* column_ptr, int column_index,
+                               std::string_view column_name);
+    Status _insert_arr_column_data(JNIEnv* env, jobject jobj, const TypeDescriptor& type, int nums,
+                                   vectorized::IColumn* column_ptr, int column_index,
+                                   std::string_view column_name);
     std::string _jobject_to_string(JNIEnv* env, jobject jobj);
     int64_t _jobject_to_date(JNIEnv* env, jobject jobj);
     int64_t _jobject_to_datetime(JNIEnv* env, jobject jobj);
+    Status _cast_string_to_array(const SlotDescriptor* slot_desc, Block* block, int column_index,
+                                 int rows);
 
     const JdbcConnectorParam& _conn_param;
+    //java.sql.Types: https://docs.oracle.com/javase/7/docs/api/constant-values.html#java.sql.Types.INTEGER
+    std::map<int, PrimitiveType> _arr_jdbc_map {
+            {-7, TYPE_BOOLEAN}, {-6, TYPE_TINYINT},  {5, TYPE_SMALLINT}, {4, TYPE_INT},
+            {-5, TYPE_BIGINT},  {12, TYPE_STRING},   {7, TYPE_FLOAT},    {8, TYPE_DOUBLE},
+            {91, TYPE_DATE},    {93, TYPE_DATETIME}, {2, TYPE_DECIMALV2}};
     bool _closed;
     jclass _executor_clazz;
     jclass _executor_list_clazz;
@@ -83,6 +102,8 @@ private:
     jmethodID _executor_has_next_id;
     jmethodID _executor_get_blocks_id;
     jmethodID _executor_get_types_id;
+    jmethodID _executor_get_arr_list_id;
+    jmethodID _executor_get_arr_type_id;
     jmethodID _executor_close_id;
     jmethodID _executor_get_list_id;
     jmethodID _executor_get_list_size_id;
@@ -93,6 +114,11 @@ private:
     jmethodID _executor_begin_trans_id;
     jmethodID _executor_finish_trans_id;
     jmethodID _executor_abort_trans_id;
+    bool _need_cast_array_type;
+    std::map<int, int> _map_column_idx_to_cast_idx;
+    std::vector<DataTypePtr> _input_array_string_types;
+    std::vector<MutableColumnPtr>
+            str_array_cols; // for array type to save data like big string [1,2,3]
 
 #define FUNC_VARI_DECLARE(RETURN_TYPE)                                \
     RETURN_TYPE _jobject_to_##RETURN_TYPE(JNIEnv* env, jobject jobj); \
diff --git a/be/src/vec/sink/vjdbc_table_sink.cpp b/be/src/vec/sink/vjdbc_table_sink.cpp
index 0d9e457cff..dba302a52b 100644
--- a/be/src/vec/sink/vjdbc_table_sink.cpp
+++ b/be/src/vec/sink/vjdbc_table_sink.cpp
@@ -44,9 +44,9 @@ Status VJdbcTableSink::init(const TDataSink& t_sink) {
     _jdbc_param.driver_path = t_jdbc_sink.jdbc_table.jdbc_driver_url;
     _jdbc_param.driver_checksum = t_jdbc_sink.jdbc_table.jdbc_driver_checksum;
     _jdbc_param.resource_name = t_jdbc_sink.jdbc_table.jdbc_resource_name;
+    _jdbc_param.table_type = t_jdbc_sink.table_type;
     _table_name = t_jdbc_sink.jdbc_table.jdbc_table_name;
     _use_transaction = t_jdbc_sink.use_transaction;
-    _need_extra_convert = (t_jdbc_sink.table_type == TOdbcTableType::ORACLE);
 
     return Status::OK();
 }
@@ -81,7 +81,7 @@ Status VJdbcTableSink::send(RuntimeState* state, Block* block) {
     uint32_t num_row_sent = 0;
     while (start_send_row < output_block.rows()) {
         RETURN_IF_ERROR(_writer->append(_table_name, &output_block, _output_vexpr_ctxs,
-                                        start_send_row, &num_row_sent, _need_extra_convert));
+                                        start_send_row, &num_row_sent, _jdbc_param.table_type));
         start_send_row += num_row_sent;
         num_row_sent = 0;
     }
diff --git a/be/src/vec/sink/vjdbc_table_sink.h b/be/src/vec/sink/vjdbc_table_sink.h
index 52238942a2..d6e91ef0b7 100644
--- a/be/src/vec/sink/vjdbc_table_sink.h
+++ b/be/src/vec/sink/vjdbc_table_sink.h
@@ -39,8 +39,6 @@ public:
 private:
     JdbcConnectorParam _jdbc_param;
     std::unique_ptr<JdbcConnector> _writer;
-    //if is ORACLE and date type, insert into need convert
-    bool _need_extra_convert = false;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
index 36b556c623..66670db2f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ColumnDef.java
@@ -266,7 +266,7 @@ public class ColumnDef {
             defaultValue = DefaultValue.BITMAP_EMPTY_DEFAULT_VALUE;
         }
 
-        if (type.getPrimitiveType() == PrimitiveType.ARRAY) {
+        if (type.getPrimitiveType() == PrimitiveType.ARRAY && isOlap) {
             if (isKey()) {
                 throw new AnalysisException("Array can only be used in the non-key column of"
                         + " the duplicate table at present.");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index b09ea77ff3..b7994b8e3d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -390,7 +390,7 @@ public class CreateTableStmt extends DdlStmt {
         for (ColumnDef columnDef : columnDefs) {
             columnDef.analyze(engineName.equals("olap"));
 
-            if (columnDef.getType().isArrayType()) {
+            if (columnDef.getType().isArrayType() && engineName.equals("olap")) {
                 if (columnDef.getAggregateType() != null && columnDef.getAggregateType() != AggregateType.NONE) {
                     throw new AnalysisException("Array column can't support aggregation "
                             + columnDef.getAggregateType());
diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
index 9539f65aba..d48a679a6c 100644
--- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
+++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java
@@ -42,6 +42,7 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class JdbcExecutor {
@@ -54,6 +55,7 @@ public class JdbcExecutor {
     // Use HikariDataSource to help us manage the JDBC connections.
     private HikariDataSource dataSource = null;
     private List<String> resultColumnTypeNames = null;
+    private int baseTypeInt = 0;
 
     public JdbcExecutor(byte[] thriftParams) throws Exception {
         TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
@@ -124,6 +126,20 @@ public class JdbcExecutor {
         }
     }
 
+    public List<Object> getArrayColumnData(Object object) throws UdfRuntimeException {
+        try {
+            java.sql.Array obj = (java.sql.Array) object;
+            baseTypeInt = obj.getBaseType();
+            return Arrays.asList((Object[]) obj.getArray());
+        } catch (SQLException e) {
+            throw new UdfRuntimeException("JDBC executor getArrayColumnData has error: ", e);
+        }
+    }
+
+    public int getBaseTypeInt() {
+        return baseTypeInt;
+    }
+
     public void openTrans() throws UdfRuntimeException {
         try {
             if (conn != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org