You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/28 07:01:32 UTC
[doris] branch master updated: [Bug](javaudf) fix BE crash if javaudf is push down (#21139)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a4fdf7324a [Bug](javaudf) fix BE crash if javaudf is push down (#21139)
a4fdf7324a is described below
commit a4fdf7324ae6faca6b33e54add38bd786cf8d3b7
Author: Gabriel <ga...@gmail.com>
AuthorDate: Wed Jun 28 15:01:24 2023 +0800
[Bug](javaudf) fix BE crash if javaudf is push down (#21139)
---
be/src/runtime/user_function_cache.cpp | 7 +-
be/src/vec/functions/function_java_udf.cpp | 232 +++++++++++++++--------------
be/src/vec/functions/function_java_udf.h | 23 +--
be/src/vec/functions/function_jsonb.cpp | 41 +++--
be/src/vec/functions/function_rpc.cpp | 14 +-
be/src/vec/functions/function_rpc.h | 1 -
6 files changed, 177 insertions(+), 141 deletions(-)
diff --git a/be/src/runtime/user_function_cache.cpp b/be/src/runtime/user_function_cache.cpp
index 25e7405a0f..f7ec0890a6 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -140,8 +140,9 @@ Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const std
}
std::vector<std::string> split_parts = strings::Split(file, ".");
- if (split_parts.size() != 3) {
- return Status::InternalError("user function's name should be function_id.checksum.so");
+ if (split_parts.size() != 3 && split_parts.size() != 4) {
+ return Status::InternalError(
+ "user function's name should be function_id.checksum[.file_name].file_type");
}
int64_t function_id = std::stol(split_parts[0]);
std::string checksum = split_parts[1];
@@ -176,7 +177,7 @@ Status UserFunctionCache::_load_cached_lib() {
auto st = _load_entry_from_lib(sub_dir, file.file_name);
if (!st.ok()) {
LOG(WARNING) << "load a library failed, dir=" << sub_dir
- << ", file=" << file.file_name;
+ << ", file=" << file.file_name << ": " << st.to_string();
}
return true;
};
diff --git a/be/src/vec/functions/function_java_udf.cpp b/be/src/vec/functions/function_java_udf.cpp
index fe96c92051..9305bae949 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -54,16 +54,24 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
if (env == nullptr) {
return Status::InternalError("Failed to get/create JVM");
}
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &executor_cl_));
- executor_ctor_id_ = env->GetMethodID(executor_cl_, "<init>", EXECUTOR_CTOR_SIGNATURE);
- RETURN_ERROR_IF_EXC(env);
- executor_evaluate_id_ = env->GetMethodID(executor_cl_, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
- RETURN_ERROR_IF_EXC(env);
- executor_close_id_ = env->GetMethodID(executor_cl_, "close", EXECUTOR_CLOSE_SIGNATURE);
- RETURN_ERROR_IF_EXC(env);
-
- std::shared_ptr<JniContext> jni_ctx =
- std::make_shared<JniContext>(_argument_types.size(), this);
+ if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
+ std::shared_ptr<JniEnv> jni_env = std::make_shared<JniEnv>();
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, &jni_env->executor_cl));
+ jni_env->executor_ctor_id =
+ env->GetMethodID(jni_env->executor_cl, "<init>", EXECUTOR_CTOR_SIGNATURE);
+ RETURN_ERROR_IF_EXC(env);
+ jni_env->executor_evaluate_id =
+ env->GetMethodID(jni_env->executor_cl, "evaluate", EXECUTOR_EVALUATE_SIGNATURE);
+ RETURN_ERROR_IF_EXC(env);
+ jni_env->executor_close_id =
+ env->GetMethodID(jni_env->executor_cl, "close", EXECUTOR_CLOSE_SIGNATURE);
+ RETURN_ERROR_IF_EXC(env);
+ context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env);
+ }
+ JniEnv* jni_env =
+ reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+ std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>(
+ _argument_types.size(), jni_env->executor_cl, jni_env->executor_close_id);
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
// Add a scoped cleanup jni reference object. This cleans up local refs made below.
@@ -99,7 +107,9 @@ Status JavaFunctionCall::open(FunctionContext* context, FunctionContext::Functio
RETURN_IF_ERROR(jni_frame.push(env));
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
- jni_ctx->executor = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+
+ jni_ctx->executor =
+ env->NewObject(jni_env->executor_cl, jni_env->executor_ctor_id, ctor_params_bytes);
jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, nullptr);
env->ReleaseByteArrayElements(ctor_params_bytes, pBytes, JNI_ABORT);
@@ -118,6 +128,8 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
JniContext* jni_ctx = reinterpret_cast<JniContext*>(
context->get_function_state(FunctionContext::THREAD_LOCAL));
+ JniEnv* jni_env =
+ reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
int arg_idx = 0;
ColumnPtr data_cols[arguments.size()];
ColumnPtr null_cols[arguments.size()];
@@ -192,105 +204,105 @@ Status JavaFunctionCall::execute(FunctionContext* context, Block& block,
*(jni_ctx->output_null_value) = reinterpret_cast<int64_t>(null_col->get_data().data());
#ifndef EVALUATE_JAVA_UDF
-#define EVALUATE_JAVA_UDF \
- if (data_col->is_column_string()) { \
- const ColumnString* str_col = assert_cast<const ColumnString*>(data_col.get()); \
- ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars()); \
- ColumnString::Offsets& offsets = \
- const_cast<ColumnString::Offsets&>(str_col->get_offsets()); \
- int increase_buffer_size = 0; \
- int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
- chars.resize(buffer_size); \
- offsets.resize(num_rows); \
- *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
- *(jni_ctx->output_offsets_ptr) = reinterpret_cast<int64_t>(offsets.data()); \
- jni_ctx->output_intermediate_state_ptr->row_idx = 0; \
- jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
- env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
- nullptr); \
- while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
- increase_buffer_size++; \
- buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
- chars.resize(buffer_size); \
- *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
- jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
- env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
- nullptr); \
- } \
- } else if (data_col->is_numeric() || data_col->is_column_decimal()) { \
- data_col->resize(num_rows); \
- *(jni_ctx->output_value_buffer) = \
- reinterpret_cast<int64_t>(data_col->get_raw_data().data); \
- env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
- nullptr); \
- } else if (data_col->is_column_array()) { \
- ColumnArray* array_col = assert_cast<ColumnArray*>(data_col.get()); \
- ColumnNullable& array_nested_nullable = \
- assert_cast<ColumnNullable&>(array_col->get_data()); \
- auto data_column_null_map = array_nested_nullable.get_null_map_column_ptr(); \
- auto data_column = array_nested_nullable.get_nested_column_ptr(); \
- auto& offset_column = array_col->get_offsets_column(); \
- int increase_buffer_size = 0; \
- int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
- offset_column.resize(num_rows); \
- *(jni_ctx->output_offsets_ptr) = \
- reinterpret_cast<int64_t>(offset_column.get_raw_data().data); \
- data_column_null_map->resize(buffer_size); \
- auto& null_map_data = \
- assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data(); \
- *(jni_ctx->output_array_null_ptr) = reinterpret_cast<int64_t>(null_map_data.data()); \
- jni_ctx->output_intermediate_state_ptr->row_idx = 0; \
- jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
- if (data_column->is_column_string()) { \
- ColumnString* str_col = assert_cast<ColumnString*>(data_column.get()); \
- ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars()); \
- ColumnString::Offsets& offsets = \
- assert_cast<ColumnString::Offsets&>(str_col->get_offsets()); \
- chars.resize(buffer_size); \
- offsets.resize(buffer_size); \
- *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
- *(jni_ctx->output_array_string_offsets_ptr) = \
- reinterpret_cast<int64_t>(offsets.data()); \
- env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
- nullptr); \
- while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
- increase_buffer_size++; \
- buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
- null_map_data.resize(buffer_size); \
- chars.resize(buffer_size); \
- offsets.resize(buffer_size); \
- *(jni_ctx->output_array_null_ptr) = \
- reinterpret_cast<int64_t>(null_map_data.data()); \
- *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
- *(jni_ctx->output_array_string_offsets_ptr) = \
- reinterpret_cast<int64_t>(offsets.data()); \
- jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
- env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, \
- executor_evaluate_id_, nullptr); \
- } \
- } else { \
- data_column->resize(buffer_size); \
- *(jni_ctx->output_value_buffer) = \
- reinterpret_cast<int64_t>(data_column->get_raw_data().data); \
- env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, executor_evaluate_id_, \
- nullptr); \
- while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
- increase_buffer_size++; \
- buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
- null_map_data.resize(buffer_size); \
- data_column->resize(buffer_size); \
- *(jni_ctx->output_array_null_ptr) = \
- reinterpret_cast<int64_t>(null_map_data.data()); \
- *(jni_ctx->output_value_buffer) = \
- reinterpret_cast<int64_t>(data_column->get_raw_data().data); \
- jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
- env->CallNonvirtualVoidMethodA(jni_ctx->executor, executor_cl_, \
- executor_evaluate_id_, nullptr); \
- } \
- } \
- } else { \
- return Status::InvalidArgument(strings::Substitute( \
- "Java UDF doesn't support return type $0 now !", return_type->get_name())); \
+#define EVALUATE_JAVA_UDF \
+ if (data_col->is_column_string()) { \
+ const ColumnString* str_col = assert_cast<const ColumnString*>(data_col.get()); \
+ ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars()); \
+ ColumnString::Offsets& offsets = \
+ const_cast<ColumnString::Offsets&>(str_col->get_offsets()); \
+ int increase_buffer_size = 0; \
+ int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
+ chars.resize(buffer_size); \
+ offsets.resize(num_rows); \
+ *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
+ *(jni_ctx->output_offsets_ptr) = reinterpret_cast<int64_t>(offsets.data()); \
+ jni_ctx->output_intermediate_state_ptr->row_idx = 0; \
+ jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
+ env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
+ jni_env->executor_evaluate_id, nullptr); \
+ while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
+ increase_buffer_size++; \
+ buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
+ chars.resize(buffer_size); \
+ *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
+ jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
+ env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
+ jni_env->executor_evaluate_id, nullptr); \
+ } \
+ } else if (data_col->is_numeric() || data_col->is_column_decimal()) { \
+ data_col->resize(num_rows); \
+ *(jni_ctx->output_value_buffer) = \
+ reinterpret_cast<int64_t>(data_col->get_raw_data().data); \
+ env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
+ jni_env->executor_evaluate_id, nullptr); \
+ } else if (data_col->is_column_array()) { \
+ ColumnArray* array_col = assert_cast<ColumnArray*>(data_col.get()); \
+ ColumnNullable& array_nested_nullable = \
+ assert_cast<ColumnNullable&>(array_col->get_data()); \
+ auto data_column_null_map = array_nested_nullable.get_null_map_column_ptr(); \
+ auto data_column = array_nested_nullable.get_nested_column_ptr(); \
+ auto& offset_column = array_col->get_offsets_column(); \
+ int increase_buffer_size = 0; \
+ int64_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
+ offset_column.resize(num_rows); \
+ *(jni_ctx->output_offsets_ptr) = \
+ reinterpret_cast<int64_t>(offset_column.get_raw_data().data); \
+ data_column_null_map->resize(buffer_size); \
+ auto& null_map_data = \
+ assert_cast<ColumnVector<UInt8>*>(data_column_null_map.get())->get_data(); \
+ *(jni_ctx->output_array_null_ptr) = reinterpret_cast<int64_t>(null_map_data.data()); \
+ jni_ctx->output_intermediate_state_ptr->row_idx = 0; \
+ jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
+ if (data_column->is_column_string()) { \
+ ColumnString* str_col = assert_cast<ColumnString*>(data_column.get()); \
+ ColumnString::Chars& chars = assert_cast<ColumnString::Chars&>(str_col->get_chars()); \
+ ColumnString::Offsets& offsets = \
+ assert_cast<ColumnString::Offsets&>(str_col->get_offsets()); \
+ chars.resize(buffer_size); \
+ offsets.resize(buffer_size); \
+ *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
+ *(jni_ctx->output_array_string_offsets_ptr) = \
+ reinterpret_cast<int64_t>(offsets.data()); \
+ env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
+ jni_env->executor_evaluate_id, nullptr); \
+ while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
+ increase_buffer_size++; \
+ buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
+ null_map_data.resize(buffer_size); \
+ chars.resize(buffer_size); \
+ offsets.resize(buffer_size); \
+ *(jni_ctx->output_array_null_ptr) = \
+ reinterpret_cast<int64_t>(null_map_data.data()); \
+ *(jni_ctx->output_value_buffer) = reinterpret_cast<int64_t>(chars.data()); \
+ *(jni_ctx->output_array_string_offsets_ptr) = \
+ reinterpret_cast<int64_t>(offsets.data()); \
+ jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
+ env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
+ jni_env->executor_evaluate_id, nullptr); \
+ } \
+ } else { \
+ data_column->resize(buffer_size); \
+ *(jni_ctx->output_value_buffer) = \
+ reinterpret_cast<int64_t>(data_column->get_raw_data().data); \
+ env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
+ jni_env->executor_evaluate_id, nullptr); \
+ while (jni_ctx->output_intermediate_state_ptr->row_idx < num_rows) { \
+ increase_buffer_size++; \
+ buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size); \
+ null_map_data.resize(buffer_size); \
+ data_column->resize(buffer_size); \
+ *(jni_ctx->output_array_null_ptr) = \
+ reinterpret_cast<int64_t>(null_map_data.data()); \
+ *(jni_ctx->output_value_buffer) = \
+ reinterpret_cast<int64_t>(data_column->get_raw_data().data); \
+ jni_ctx->output_intermediate_state_ptr->buffer_size = buffer_size; \
+ env->CallNonvirtualVoidMethodA(jni_ctx->executor, jni_env->executor_cl, \
+ jni_env->executor_evaluate_id, nullptr); \
+ } \
+ } \
+ } else { \
+ return Status::InvalidArgument(strings::Substitute( \
+ "Java UDF doesn't support return type $0 now !", return_type->get_name())); \
}
#endif
EVALUATE_JAVA_UDF;
diff --git a/be/src/vec/functions/function_java_udf.h b/be/src/vec/functions/function_java_udf.h
index 1f47394c69..605d7c0198 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -23,6 +23,7 @@
#include <stdint.h>
#include <memory>
+#include <mutex>
#include <ostream>
#include "common/logging.h"
@@ -83,13 +84,6 @@ private:
const DataTypes _argument_types;
const DataTypePtr _return_type;
- /// Global class reference to the UdfExecutor Java class and related method IDs. Set in
- /// Init(). These have the lifetime of the process (i.e. 'executor_cl_' is never freed).
- jclass executor_cl_;
- jmethodID executor_ctor_id_;
- jmethodID executor_evaluate_id_;
- jmethodID executor_close_id_;
-
struct IntermediateState {
size_t buffer_size;
size_t row_idx;
@@ -97,6 +91,15 @@ private:
IntermediateState() : buffer_size(0), row_idx(0) {}
};
+ struct JniEnv {
+ /// Global class reference to the UdfExecutor Java class and related method IDs. Set in
+ /// Init(). These have the lifetime of the process (i.e. 'executor_cl_' is never freed).
+ jclass executor_cl;
+ jmethodID executor_ctor_id;
+ jmethodID executor_evaluate_id;
+ jmethodID executor_close_id;
+ };
+
struct JniContext {
// Do not save parent directly, because parent is in VExpr, but jni context is in FunctionContext
// The deconstruct sequence is not determined, it will core.
@@ -124,9 +127,9 @@ private:
// intermediate_state includes two parts: reserved / used buffer size and rows
std::unique_ptr<IntermediateState> output_intermediate_state_ptr;
- JniContext(int64_t num_args, JavaFunctionCall* parent)
- : executor_cl_(parent->executor_cl_),
- executor_close_id_(parent->executor_close_id_),
+ JniContext(int64_t num_args, jclass executor_cl, jmethodID executor_close_id)
+ : executor_cl_(executor_cl),
+ executor_close_id_(executor_close_id),
input_values_buffer_ptr(new int64_t[num_args]),
input_nulls_buffer_ptr(new int64_t[num_args]),
input_offsets_ptrs(new int64_t[num_args]),
diff --git a/be/src/vec/functions/function_jsonb.cpp b/be/src/vec/functions/function_jsonb.cpp
index fbe5f11313..ac62c59ef7 100644
--- a/be/src/vec/functions/function_jsonb.cpp
+++ b/be/src/vec/functions/function_jsonb.cpp
@@ -78,8 +78,10 @@ enum class JsonbParseErrorMode { FAIL = 0, RETURN_NULL, RETURN_VALUE, RETURN_INV
template <NullalbeMode nullable_mode, JsonbParseErrorMode parse_error_handle_mode>
class FunctionJsonbParseBase : public IFunction {
private:
- JsonbParser default_value_parser;
- bool has_const_default_value = false;
+ struct FunctionJsonbParseState {
+ JsonbParser default_value_parser;
+ bool has_const_default_value = false;
+ };
public:
static constexpr auto name = "json_parse";
@@ -152,20 +154,31 @@ public:
bool use_default_implementation_for_nulls() const override { return false; }
Status open(FunctionContext* context, FunctionContext::FunctionStateScope scope) override {
+ if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
+ std::shared_ptr<FunctionJsonbParseState> state =
+ std::make_shared<FunctionJsonbParseState>();
+ context->set_function_state(FunctionContext::FRAGMENT_LOCAL, state);
+ }
if constexpr (parse_error_handle_mode == JsonbParseErrorMode::RETURN_VALUE) {
if (context->is_col_constant(1)) {
const auto default_value_col = context->get_constant_col(1)->column_ptr;
const auto& default_value = default_value_col->get_data_at(0);
JsonbErrType error = JsonbErrType::E_NONE;
- if (!default_value_parser.parse(default_value.data, default_value.size)) {
- error = default_value_parser.getErrorCode();
- return Status::InvalidArgument(
- "invalid default json value: {} , error: {}",
- std::string_view(default_value.data, default_value.size),
- JsonbErrMsg::getErrMsg(error));
+ if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
+ FunctionJsonbParseState* state = reinterpret_cast<FunctionJsonbParseState*>(
+ context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+
+ if (!state->default_value_parser.parse(default_value.data,
+ default_value.size)) {
+ error = state->default_value_parser.getErrorCode();
+ return Status::InvalidArgument(
+ "invalid default json value: {} , error: {}",
+ std::string_view(default_value.data, default_value.size),
+ JsonbErrMsg::getErrMsg(error));
+ }
+ state->has_const_default_value = true;
}
- has_const_default_value = true;
}
}
return Status::OK();
@@ -257,10 +270,14 @@ public:
continue;
}
case JsonbParseErrorMode::RETURN_VALUE: {
- if (has_const_default_value) {
+ FunctionJsonbParseState* state = reinterpret_cast<FunctionJsonbParseState*>(
+ context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+ if (state->has_const_default_value) {
col_to->insert_data(
- default_value_parser.getWriter().getOutput()->getBuffer(),
- (size_t)default_value_parser.getWriter().getOutput()->getSize());
+ state->default_value_parser.getWriter().getOutput()->getBuffer(),
+ (size_t)state->default_value_parser.getWriter()
+ .getOutput()
+ ->getSize());
} else {
auto val = block.get_by_position(arguments[1]).column->get_data_at(i);
if (parser.parse(val.data, val.size)) {
diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp
index dbd92eeec2..44c5e1b369 100644
--- a/be/src/vec/functions/function_rpc.cpp
+++ b/be/src/vec/functions/function_rpc.cpp
@@ -90,16 +90,20 @@ FunctionRPC::FunctionRPC(const TFunction& fn, const DataTypes& argument_types,
: _argument_types(argument_types), _return_type(return_type), _tfn(fn) {}
Status FunctionRPC::open(FunctionContext* context, FunctionContext::FunctionStateScope scope) {
- _fn = std::make_unique<RPCFnImpl>(_tfn);
-
- if (!_fn->available()) {
- return Status::InternalError("rpc env init error");
+ if (scope == FunctionContext::FRAGMENT_LOCAL) {
+ std::shared_ptr<RPCFnImpl> fn = std::make_shared<RPCFnImpl>(_tfn);
+ if (!fn->available()) {
+ return Status::InternalError("rpc env init error");
+ }
+ context->set_function_state(FunctionContext::FRAGMENT_LOCAL, fn);
}
return Status::OK();
}
Status FunctionRPC::execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
size_t result, size_t input_rows_count, bool dry_run) {
- return _fn->vec_call(context, block, arguments, result, input_rows_count);
+ RPCFnImpl* fn = reinterpret_cast<RPCFnImpl*>(
+ context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
+ return fn->vec_call(context, block, arguments, result, input_rows_count);
}
} // namespace doris::vectorized
diff --git a/be/src/vec/functions/function_rpc.h b/be/src/vec/functions/function_rpc.h
index 5623183470..d10b9be546 100644
--- a/be/src/vec/functions/function_rpc.h
+++ b/be/src/vec/functions/function_rpc.h
@@ -107,7 +107,6 @@ private:
DataTypes _argument_types;
DataTypePtr _return_type;
TFunction _tfn;
- std::unique_ptr<RPCFnImpl> _fn;
};
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org