You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/28 07:01:32 UTC

[doris] branch master updated: [Bug](javaudf) fix BE crash if javaudf is push down (#21139)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a4fdf7324a [Bug](javaudf) fix BE crash if javaudf is push down (#21139)
a4fdf7324a is described below

commit a4fdf7324ae6faca6b33e54add38bd786cf8d3b7
Author: Gabriel <ga...@gmail.com>
AuthorDate: Wed Jun 28 15:01:24 2023 +0800

    [Bug](javaudf) fix BE crash if javaudf is push down (#21139)
---
 be/src/runtime/user_function_cache.cpp     |   7 +-
 be/src/vec/functions/function_java_udf.cpp | 232 +++++++++++++++--------------
 be/src/vec/functions/function_java_udf.h   |  23 +--
 be/src/vec/functions/function_jsonb.cpp    |  41 +++--
 be/src/vec/functions/function_rpc.cpp      |  14 +-
 be/src/vec/functions/function_rpc.h        |   1 -
 6 files changed, 177 insertions(+), 141 deletions(-)

diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp
index 25e7405a0f..f7ec0890a6 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -140,8 +140,9 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
     }
 
     std::vector<std::string> split_parts = strings::Split(file, ".");
-    if (split_parts.size() != 3) {
-        return Status::InternalError("user function's name should be function_id.checksum.so");
+    if (split_parts.size() != 3 && split_parts.size() != 4) {
+        return Status::InternalError(
+                "user function's name should be function_id.checksum[.file_name].file_type");
     }
     int64_t function_id = std::stol(split_parts[0]);
     std::string checksum = split_parts[1];
@@ -176,7 +177,7 @@ Status UserFunctionCache::_load_cached_lib() {
             auto st = _load_entry_from_lib(sub_dir, file.file_name);
             if (!st.ok()) {
                 LOG(WARNING) << "load a library failed, dir=" << sub_dir
-                             << ", file=" << file.file_name;
+                             << ", file=" << file.file_name << ": " << st.to_string();
             }
             return true;
         };
diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp
index fe96c92051..9305bae949 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -54,16 +54,24 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
     if (env == nullptr) {
         return Status::InternalError("Failed to get/create JVM");
     }
-    RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &executor_cl_));
-    executor_ctor_id_ = env->GetMethodID(executor_cl_, "<init>", EXECUTOR_CTOR_SIGNATURE);
-    RETURN_ERROR_IF_EXC(env);
-    executor_evaluate_id_ = env->GetMethodID(executor_cl_, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
-    RETURN_ERROR_IF_EXC(env);
-    executor_close_id_ = env->GetMethodID(executor_cl_, "close", EXECUTOR_CLOSE_SIGNATURE);
-    RETURN_ERROR_IF_EXC(env);
-
-    std::shared_ptr<JniContext> jni_ctx =
-            std::make_shared<JniContext>(_argument_types.size(), this);
+    if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
+        std::shared_ptr<JniEnv> jni_env = std::make_shared<JniEnv>();
+        RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_env->executor_cl));
+        jni_env->executor_ctor_id =
+                env->GetMethodID(jni_env->executor_cl, "<init>", EXECUTOR_CTOR_SIGNATURE);
+        RETURN_ERROR_IF_EXC(env);
+        jni_env->executor_evaluate_id =
+                env->GetMethodID(jni_env->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
+        RETURN_ERROR_IF_EXC(env);
+        jni_env->executor_close_id =
+                env->GetMethodID(jni_env->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE);
+        RETURN_ERROR_IF_EXC(env);
+        context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env);
+    }
+    JniEnv* jni_env =
+            reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+    std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>(
+            _argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id);
     context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
 
     // Add a scoped cleanup jni reference object. This cleans up local refs made below.
@@ -99,7 +107,9 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
         RETURN_IF_ERROR(jni_frame.push(env));
 
         RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
-        jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+
+        jni_ctx->executor =
+                env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id, ctor_params_bytes);
 
         jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
         env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
@@ -118,6 +128,8 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     JniContext* jni_ctx = reinterpret_cast<JniContext*>(
             context->get_function_state(FunctionContext::THREAD_LOCAL));
+    JniEnv* jni_env =
+            reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
     int arg_idx = 0;
     ColumnPtr data_cols[arguments.size()];
     ColumnPtr null_cols[arguments.size()];
@@ -192,105 +204,105 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
 
         *(jni_ctx->output_null_value) = reinterpret_cast<int64_t>(null_col->get_data().data());
 #ifndef EVALUATE_JAVA_UDF
-#define EVALUATE_JAVA_UDF                                                                          \
-    if (data_col->is_column_string()) {                                                            \
-        const ColumnString* str_col = assert_cast<const ColumnString*>(data_col.get());            \
-        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());       \
-        ColumnString::Offsets& offsets =                                                           \
-                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                        \
-        int increase_buffer_size = 0;                                                              \
-        int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);           \
-        chars.resize(buffer_size);                                                                 \
-        offsets.resize(num_rows);                                                                  \
-        *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data());                 \
-        *(jni_ctx->output_offsets_ptr) = reinterpret_cast<int64_t>(offsets.data());                \
-        jni_ctx->output_intermediate_state_ptr->row_idx = 0;                                       \
-        jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;                         \
-        env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_,     \
-                                       nullptr);                                                   \
-        while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) {                       \
-            increase_buffer_size++;                                                                \
-            buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);               \
-            chars.resize(buffer_size);                                                             \
-            *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data());             \
-            jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;                     \
-            env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
-                                           nullptr);                                               \
-        }                                                                                          \
-    } else if (data_col->is_numeric() || data_col->is_column_decimal()) {                          \
-        data_col->resize(num_rows);                                                                \
-        *(jni_ctx->output_value_buffer) =                                                          \
-                reinterpret_cast<int64_t>(data_col->get_raw_data().data);                          \
-        env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_,     \
-                                       nullptr);                                                   \
-    } else if (data_col->is_column_array()) {                                                      \
-        ColumnArray* array_col = assert_cast<ColumnArray*>(data_col.get());                        \
-        ColumnNullable& array_nested_nullable =                                                    \
-                assert_cast<ColumnNullable&>(array_col->get_data());                               \
-        auto data_column_null_map = array_nested_nullable.get_null_map_column_ptr();               \
-        auto data_column = array_nested_nullable.get_nested_column_ptr();                          \
-        auto& offset_column = array_col->get_offsets_column();                                     \
-        int increase_buffer_size = 0;                                                              \
-        int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);           \
-        offset_column.resize(num_rows);                                                            \
-        *(jni_ctx->output_offsets_ptr) =                                                           \
-                reinterpret_cast<int64_t>(offset_column.get_raw_data().data);                      \
-        data_column_null_map->resize(buffer_size);                                                 \
-        auto& null_map_data =                                                                      \
-                assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data();         \
-        *(jni_ctx->output_array_null_ptr) = reinterpret_cast<int64_t>(null_map_data.data());       \
-        jni_ctx->output_intermediate_state_ptr->row_idx = 0;                                       \
-        jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;                         \
-        if (data_column->is_column_string()) {                                                     \
-            ColumnString* str_col = assert_cast<ColumnString*>(data_column.get());                 \
-            ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars());  \
-            ColumnString::Offsets& offsets =                                                       \
-                    assert_cast<ColumnString::Offsets&>(str_col->get_offsets());                   \
-            chars.resize(buffer_size);                                                             \
-            offsets.resize(buffer_size);                                                           \
-            *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data());             \
-            *(jni_ctx->output_array_string_offsets_ptr) =                                          \
-                    reinterpret_cast<int64_t>(offsets.data());                                     \
-            env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
-                                           nullptr);                                               \
-            while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) {                   \
-                increase_buffer_size++;                                                            \
-                buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);           \
-                null_map_data.resize(buffer_size);                                                 \
-                chars.resize(buffer_size);                                                         \
-                offsets.resize(buffer_size);                                                       \
-                *(jni_ctx->output_array_null_ptr) =                                                \
-                        reinterpret_cast<int64_t>(null_map_data.data());                           \
-                *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data());         \
-                *(jni_ctx->output_array_string_offsets_ptr) =                                      \
-                        reinterpret_cast<int64_t>(offsets.data());                                 \
-                jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;                 \
-                env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_,                    \
-                                               executor_evaluate_id_, nullptr);                    \
-            }                                                                                      \
-        } else {                                                                                   \
-            data_column->resize(buffer_size);                                                      \
-            *(jni_ctx->output_value_buffer) =                                                      \
-                    reinterpret_cast<int64_t>(data_column->get_raw_data().data);                   \
-            env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
-                                           nullptr);                                               \
-            while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) {                   \
-                increase_buffer_size++;                                                            \
-                buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);           \
-                null_map_data.resize(buffer_size);                                                 \
-                data_column->resize(buffer_size);                                                  \
-                *(jni_ctx->output_array_null_ptr) =                                                \
-                        reinterpret_cast<int64_t>(null_map_data.data());                           \
-                *(jni_ctx->output_value_buffer) =                                                  \
-                        reinterpret_cast<int64_t>(data_column->get_raw_data().data);               \
-                jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;                 \
-                env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_,                    \
-                                               executor_evaluate_id_, nullptr);                    \
-            }                                                                                      \
-        }                                                                                          \
-    } else {                                                                                       \
-        return Status::InvalidArgument(strings::Substitute(                                        \
-                "Java UDF doesn't support return type $0 now !", return_type->get_name()));        \
+#define EVALUATE_JAVA_UDF                                                                         \
+    if (data_col->is_column_string()) {                                                           \
+        const ColumnString* str_col = assert_cast<const ColumnString*>(data_col.get());           \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);          \
+        chars.resize(buffer_size);                                                                \
+        offsets.resize(num_rows);                                                                 \
+        *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data());                \
+        *(jni_ctx->output_offsets_ptr) = reinterpret_cast<int64_t>(offsets.data());               \
+        jni_ctx->output_intermediate_state_ptr->row_idx = 0;                                      \
+        jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;                        \
+        env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl,                   \
+                                       jni_env->executor_evaluate_id, nullptr);                   \
+        while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) {                      \
+            increase_buffer_size++;                                                               \
+            buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);              \
+            chars.resize(buffer_size);                                                            \
+            *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data());            \
+            jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;                    \
+            env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl,               \
+                                           jni_env->executor_evaluate_id, nullptr);               \
+        }                                                                                         \
+    } else if (data_col->is_numeric() || data_col->is_column_decimal()) {                         \
+        data_col->resize(num_rows);                                                               \
+        *(jni_ctx->output_value_buffer) =                                                         \
+                reinterpret_cast<int64_t>(data_col->get_raw_data().data);                         \
+        env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl,                   \
+                                       jni_env->executor_evaluate_id, nullptr);                   \
+    } else if (data_col->is_column_array()) {                                                     \
+        ColumnArray* array_col = assert_cast<ColumnArray*>(data_col.get());                       \
+        ColumnNullable& array_nested_nullable =                                                   \
+                assert_cast<ColumnNullable&>(array_col->get_data());                              \
+        auto data_column_null_map = array_nested_nullable.get_null_map_column_ptr();              \
+        auto data_column = array_nested_nullable.get_nested_column_ptr();                         \
+        auto& offset_column = array_col->get_offsets_column();                                    \
+        int increase_buffer_size = 0;                                                             \
+        int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);          \
+        offset_column.resize(num_rows);                                                           \
+        *(jni_ctx->output_offsets_ptr) =                                                          \
+                reinterpret_cast<int64_t>(offset_column.get_raw_data().data);                     \
+        data_column_null_map->resize(buffer_size);                                                \
+        auto& null_map_data =                                                                     \
+                assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data();        \
+        *(jni_ctx->output_array_null_ptr) = reinterpret_cast<int64_t>(null_map_data.data());      \
+        jni_ctx->output_intermediate_state_ptr->row_idx = 0;                                      \
+        jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;                        \
+        if (data_column->is_column_string()) {                                                    \
+            ColumnString* str_col = assert_cast<ColumnString*>(data_column.get());                \
+            ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars()); \
+            ColumnString::Offsets& offsets =                                                      \
+                    assert_cast<ColumnString::Offsets&>(str_col->get_offsets());                  \
+            chars.resize(buffer_size);                                                            \
+            offsets.resize(buffer_size);                                                          \
+            *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data());            \
+            *(jni_ctx->output_array_string_offsets_ptr) =                                         \
+                    reinterpret_cast<int64_t>(offsets.data());                                    \
+            env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl,               \
+                                           jni_env->executor_evaluate_id, nullptr);               \
+            while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) {                  \
+                increase_buffer_size++;                                                           \
+                buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);          \
+                null_map_data.resize(buffer_size);                                                \
+                chars.resize(buffer_size);                                                        \
+                offsets.resize(buffer_size);                                                      \
+                *(jni_ctx->output_array_null_ptr) =                                               \
+                        reinterpret_cast<int64_t>(null_map_data.data());                          \
+                *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data());        \
+                *(jni_ctx->output_array_string_offsets_ptr) =                                     \
+                        reinterpret_cast<int64_t>(offsets.data());                                \
+                jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;                \
+                env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl,           \
+                                               jni_env->executor_evaluate_id, nullptr);           \
+            }                                                                                     \
+        } else {                                                                                  \
+            data_column->resize(buffer_size);                                                     \
+            *(jni_ctx->output_value_buffer) =                                                     \
+                    reinterpret_cast<int64_t>(data_column->get_raw_data().data);                  \
+            env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl,               \
+                                           jni_env->executor_evaluate_id, nullptr);               \
+            while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) {                  \
+                increase_buffer_size++;                                                           \
+                buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);          \
+                null_map_data.resize(buffer_size);                                                \
+                data_column->resize(buffer_size);                                                 \
+                *(jni_ctx->output_array_null_ptr) =                                               \
+                        reinterpret_cast<int64_t>(null_map_data.data());                          \
+                *(jni_ctx->output_value_buffer) =                                                 \
+                        reinterpret_cast<int64_t>(data_column->get_raw_data().data);              \
+                jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size;                \
+                env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl,           \
+                                               jni_env->executor_evaluate_id, nullptr);           \
+            }                                                                                     \
+        }                                                                                         \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDF doesn't support return type $0 now !", return_type->get_name()));       \
     }
 #endif
         EVALUATE_JAVA_UDF;
diff --git a/be/src/vec/functions/function_java_udf.h b/be/src/vec/functions/function_java_udf.h
index 1f47394c69..605d7c0198 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -23,6 +23,7 @@
 #include <stdint.h>
 
 #include <memory>
+#include <mutex>
 #include <ostream>
 
 #include "common/logging.h"
@@ -83,13 +84,6 @@ private:
     const DataTypes _argument_types;
     const DataTypePtr _return_type;
 
-    /// Global class reference to the UdfExecutor Java class and related method IDs. Set in
-    /// Init(). These have the lifetime of the process (i.e. 'executor_cl_' is never freed).
-    jclass executor_cl_;
-    jmethodID executor_ctor_id_;
-    jmethodID executor_evaluate_id_;
-    jmethodID executor_close_id_;
-
     struct IntermediateState {
         size_t buffer_size;
         size_t row_idx;
@@ -97,6 +91,15 @@ private:
         IntermediateState() : buffer_size(0), row_idx(0) {}
     };
 
+    struct JniEnv {
+        /// Global class reference to the UdfExecutor Java class and related method IDs. Set in
+        /// Init(). These have the lifetime of the process (i.e. 'executor_cl_' is never freed).
+        jclass executor_cl;
+        jmethodID executor_ctor_id;
+        jmethodID executor_evaluate_id;
+        jmethodID executor_close_id;
+    };
+
     struct JniContext {
         // Do not save parent directly, because parent is in VExpr, but jni context is in FunctionContext
         // The deconstruct sequence is not determined, it will core.
@@ -124,9 +127,9 @@ private:
         // intermediate_state includes two parts: reserved / used buffer size and rows
         std::unique_ptr<IntermediateState> output_intermediate_state_ptr;
 
-        JniContext(int64_t num_args, JavaFunctionCall* parent)
-                : executor_cl_(parent->executor_cl_),
-                  executor_close_id_(parent->executor_close_id_),
+        JniContext(int64_t num_args, jclass executor_cl, jmethodID executor_close_id)
+                : executor_cl_(executor_cl),
+                  executor_close_id_(executor_close_id),
                   input_values_buffer_ptr(new int64_t[num_args]),
                   input_nulls_buffer_ptr(new int64_t[num_args]),
                   input_offsets_ptrs(new int64_t[num_args]),
diff --git a/be/src/vec/functions/function_jsonb.cpp b/be/src/vec/functions/function_jsonb.cpp
index fbe5f11313..ac62c59ef7 100644
--- a/be/src/vec/functions/function_jsonb.cpp
+++ b/be/src/vec/functions/function_jsonb.cpp
@@ -78,8 +78,10 @@ enum class JsonbParseErrorMode { FAIL = 0, RETURN_NULL, RETURN_VALUE, RETURN_INV
 template <NullalbeMode nullable_mode, JsonbParseErrorMode parse_error_handle_mode>
 class FunctionJsonbParseBase : public IFunction {
 private:
-    JsonbParser default_value_parser;
-    bool has_const_default_value = false;
+    struct FunctionJsonbParseState {
+        JsonbParser default_value_parser;
+        bool has_const_default_value = false;
+    };
 
 public:
     static constexpr auto name = "json_parse";
@@ -152,20 +154,31 @@ public:
     bool use_default_implementation_for_nulls() const override { return false; }
 
     Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override {
+        if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
+            std::shared_ptr<FunctionJsonbParseState> state =
+                    std::make_shared<FunctionJsonbParseState>();
+            context->set_function_state(FunctionContext::FRAGMENT_LOCAL, state);
+        }
         if constexpr (parse_error_handle_mode == JsonbParseErrorMode::RETURN_VALUE) {
             if (context->is_col_constant(1)) {
                 const auto default_value_col = context->get_constant_col(1)->column_ptr;
                 const auto& default_value = default_value_col->get_data_at(0);
 
                 JsonbErrType error = JsonbErrType::E_NONE;
-                if (!default_value_parser.parse(default_value.data, default_value.size)) {
-                    error = default_value_parser.getErrorCode();
-                    return Status::InvalidArgument(
-                            "invalid default json value: {} , error: {}",
-                            std::string_view(default_value.data, default_value.size),
-                            JsonbErrMsg::getErrMsg(error));
+                if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
+                    FunctionJsonbParseState* state = reinterpret_cast<FunctionJsonbParseState*>(
+                            context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+
+                    if (!state->default_value_parser.parse(default_value.data,
+                                                           default_value.size)) {
+                        error = state->default_value_parser.getErrorCode();
+                        return Status::InvalidArgument(
+                                "invalid default json value: {} , error: {}",
+                                std::string_view(default_value.data, default_value.size),
+                                JsonbErrMsg::getErrMsg(error));
+                    }
+                    state->has_const_default_value = true;
                 }
-                has_const_default_value = true;
             }
         }
         return Status::OK();
@@ -257,10 +270,14 @@ public:
                     continue;
                 }
                 case JsonbParseErrorMode::RETURN_VALUE: {
-                    if (has_const_default_value) {
+                    FunctionJsonbParseState* state = reinterpret_cast<FunctionJsonbParseState*>(
+                            context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+                    if (state->has_const_default_value) {
                         col_to->insert_data(
-                                default_value_parser.getWriter().getOutput()->getBuffer(),
-                                (size_t)default_value_parser.getWriter().getOutput()->getSize());
+                                state->default_value_parser.getWriter().getOutput()->getBuffer(),
+                                (size_t)state->default_value_parser.getWriter()
+                                        .getOutput()
+                                        ->getSize());
                     } else {
                         auto val = block.get_by_position(arguments[1]).column->get_data_at(i);
                         if (parser.parse(val.data, val.size)) {
diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp
index dbd92eeec2..44c5e1b369 100644
--- a/be/src/vec/functions/function_rpc.cpp
+++ b/be/src/vec/functions/function_rpc.cpp
@@ -90,16 +90,20 @@ FunctionRPC::FunctionRPC(const TFunction& fn, const DataTypes& argument_types,
         : _argument_types(argument_types), _return_type(return_type), _tfn(fn) {}
 
 Status FunctionRPC::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) {
-    _fn = std::make_unique<RPCFnImpl>(_tfn);
-
-    if (!_fn->available()) {
-        return Status::InternalError("rpc env init error");
+    if (scope == FunctionContext::FRAGMENT_LOCAL) {
+        std::shared_ptr<RPCFnImpl> fn = std::make_shared<RPCFnImpl>(_tfn);
+        if (!fn->available()) {
+            return Status::InternalError("rpc env init error");
+        }
+        context->set_function_state(FunctionContext::FRAGMENT_LOCAL, fn);
     }
     return Status::OK();
 }
 
 Status FunctionRPC::execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
                             size_t result, size_t input_rows_count, bool dry_run) {
-    return _fn->vec_call(context, block, arguments, result, input_rows_count);
+    RPCFnImpl* fn = reinterpret_cast<RPCFnImpl*>(
+            context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+    return fn->vec_call(context, block, arguments, result, input_rows_count);
 }
 } // namespace doris::vectorized
diff --git a/be/src/vec/functions/function_rpc.h b/be/src/vec/functions/function_rpc.h
index 5623183470..d10b9be546 100644
--- a/be/src/vec/functions/function_rpc.h
+++ b/be/src/vec/functions/function_rpc.h
@@ -107,7 +107,6 @@ private:
     DataTypes _argument_types;
     DataTypePtr _return_type;
     TFunction _tfn;
-    std::unique_ptr<RPCFnImpl> _fn;
 };
 
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org