You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/06/02 02:36:36 UTC

[GitHub] [incubator-doris] zhangstar333 opened a new pull request, #9930: [Vectorized][UDF] support java-udaf

zhangstar333 opened a new pull request, #9930:
URL: https://github.com/apache/incubator-doris/pull/9930

   # Proposed changes
   
   Issue Number: close #8389
   
   ## Problem Summary:
   
   Describe the overview of changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
HappenLee commented on PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#issuecomment-1146521590

   need also modify the doc,please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Gabriel39 commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r896325774


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "()[B";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+// Calling Java method about those signture means: "(argument-types)return-type"
+// https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jbyteArray arr = env->NewByteArray(len);
+        env->SetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, arr);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        // TODO: Here get a byte[] from FE serialize, and then allocate the same length bytes to
+        // save it in BE, Because i'm not sure there is a way to use the byte[] not allocate again.
+        jbyteArray arr = (jbyteArray)(env->CallNonvirtualObjectMethod(executor_obj, executor_cl,
+                                                                      executor_serialize_id));
+        int len = env->GetArrayLength(arr);
+        serialize_data.resize(len);

Review Comment:
   call `serialize_data` before using it



##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "()[B";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+// Calling Java method about those signture means: "(argument-types)return-type"
+// https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jbyteArray arr = env->NewByteArray(len);
+        env->SetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, arr);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        // TODO: Here get a byte[] from FE serialize, and then allocate the same length bytes to
+        // save it in BE, Because i'm not sure there is a way to use the byte[] not allocate again.
+        jbyteArray arr = (jbyteArray)(env->CallNonvirtualObjectMethod(executor_obj, executor_cl,
+                                                                      executor_serialize_id));
+        int len = env->GetArrayLength(arr);
+        serialize_data.resize(len);
+        env->GetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        write_binary(serialize_data, buf);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);               \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);      \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+private:
+    Status register_func_id(JNIEnv* env) {
+        auto register_id = [&](const char* func_name, const char* func_sign, jmethodID& func_id) {
+            func_id = env->GetMethodID(executor_cl, func_name, func_sign);
+            Status s = JniUtil::GetJniExceptionMsg(env);
+            if (!s.ok()) {
+                return Status::InternalError(
+                        strings::Substitute("Java-Udaf register_func_id meet error and error is $0",
+                                            s.get_error_msg()));
+            }
+            return s;
+        };
+
+        RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
+        RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+        RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
+        RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
+        RETURN_IF_ERROR(
+                register_id("serialize", UDAF_EXECUTOR_SERIALIZE_SIGNATURE, executor_serialize_id));
+        RETURN_IF_ERROR(
+                register_id("getValue", UDAF_EXECUTOR_RESULT_SIGNATURE, executor_result_id));
+        return Status::OK();
+    }
+
+private:
+    jclass executor_cl;
+    jobject executor_obj;
+    jmethodID executor_ctor_id;
+
+    jmethodID executor_add_id;
+    jmethodID executor_merge_id;
+    jmethodID executor_serialize_id;
+    jmethodID executor_result_id;
+    jmethodID executor_close_id;
+
+    std::unique_ptr<int64_t[]> input_values_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_nulls_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_offsets_ptrs;
+    std::unique_ptr<int64_t> output_value_buffer;
+    std::unique_ptr<int64_t> output_null_value;
+    std::unique_ptr<int64_t> output_offsets_ptr;
+    std::unique_ptr<int64_t> output_intermediate_state_ptr;
+
+    bool first_init;
+    int argument_size = 0;
+    std::string serialize_data;
+};
+
+class AggregateJavaUdaf final
+        : public IAggregateFunctionDataHelper<AggregateJavaUdafData, AggregateJavaUdaf> {
+public:
+    AggregateJavaUdaf(const TFunction& fn, const DataTypes& argument_types, const Array& parameters,
+                      const DataTypePtr& return_type)
+            : IAggregateFunctionDataHelper(argument_types, parameters),
+              _fn(fn),
+              _return_type(return_type) {}
+    ~AggregateJavaUdaf() = default;
+
+    static AggregateFunctionPtr create(const TFunction& fn, const DataTypes& argument_types,
+                                       const Array& parameters, const DataTypePtr& return_type) {
+        return std::make_shared<AggregateJavaUdaf>(fn, argument_types, parameters, return_type);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override {
+        new (place) Data(argument_types.size());
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(_fn));

Review Comment:
   use `RETURN_IF_ERROR`



##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "()[B";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+// Calling Java method about those signture means: "(argument-types)return-type"
+// https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jbyteArray arr = env->NewByteArray(len);
+        env->SetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, arr);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        // TODO: Here get a byte[] from FE serialize, and then allocate the same length bytes to
+        // save it in BE, Because i'm not sure there is a way to use the byte[] not allocate again.
+        jbyteArray arr = (jbyteArray)(env->CallNonvirtualObjectMethod(executor_obj, executor_cl,
+                                                                      executor_serialize_id));
+        int len = env->GetArrayLength(arr);
+        serialize_data.resize(len);
+        env->GetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        write_binary(serialize_data, buf);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);               \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);      \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+private:
+    Status register_func_id(JNIEnv* env) {
+        auto register_id = [&](const char* func_name, const char* func_sign, jmethodID& func_id) {
+            func_id = env->GetMethodID(executor_cl, func_name, func_sign);
+            Status s = JniUtil::GetJniExceptionMsg(env);
+            if (!s.ok()) {
+                return Status::InternalError(
+                        strings::Substitute("Java-Udaf register_func_id meet error and error is $0",
+                                            s.get_error_msg()));
+            }
+            return s;
+        };
+
+        RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
+        RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+        RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
+        RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
+        RETURN_IF_ERROR(
+                register_id("serialize", UDAF_EXECUTOR_SERIALIZE_SIGNATURE, executor_serialize_id));
+        RETURN_IF_ERROR(
+                register_id("getValue", UDAF_EXECUTOR_RESULT_SIGNATURE, executor_result_id));
+        return Status::OK();
+    }
+
+private:
+    jclass executor_cl;
+    jobject executor_obj;
+    jmethodID executor_ctor_id;
+
+    jmethodID executor_add_id;
+    jmethodID executor_merge_id;
+    jmethodID executor_serialize_id;
+    jmethodID executor_result_id;
+    jmethodID executor_close_id;
+
+    std::unique_ptr<int64_t[]> input_values_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_nulls_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_offsets_ptrs;
+    std::unique_ptr<int64_t> output_value_buffer;
+    std::unique_ptr<int64_t> output_null_value;
+    std::unique_ptr<int64_t> output_offsets_ptr;
+    std::unique_ptr<int64_t> output_intermediate_state_ptr;
+
+    bool first_init;

Review Comment:
   I think this variable name is a bit curious. `init` also means `first`. Maybe rename to `inited` or `first_called` ?



##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "()[B";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+// Calling Java method about those signture means: "(argument-types)return-type"
+// https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jbyteArray arr = env->NewByteArray(len);
+        env->SetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, arr);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        // TODO: Here get a byte[] from FE serialize, and then allocate the same length bytes to
+        // save it in BE, Because i'm not sure there is a way to use the byte[] not allocate again.
+        jbyteArray arr = (jbyteArray)(env->CallNonvirtualObjectMethod(executor_obj, executor_cl,
+                                                                      executor_serialize_id));
+        int len = env->GetArrayLength(arr);
+        serialize_data.resize(len);
+        env->GetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        write_binary(serialize_data, buf);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);               \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);      \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+private:
+    Status register_func_id(JNIEnv* env) {
+        auto register_id = [&](const char* func_name, const char* func_sign, jmethodID& func_id) {
+            func_id = env->GetMethodID(executor_cl, func_name, func_sign);
+            Status s = JniUtil::GetJniExceptionMsg(env);
+            if (!s.ok()) {
+                return Status::InternalError(
+                        strings::Substitute("Java-Udaf register_func_id meet error and error is $0",
+                                            s.get_error_msg()));
+            }
+            return s;
+        };
+
+        RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
+        RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+        RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
+        RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
+        RETURN_IF_ERROR(
+                register_id("serialize", UDAF_EXECUTOR_SERIALIZE_SIGNATURE, executor_serialize_id));
+        RETURN_IF_ERROR(
+                register_id("getValue", UDAF_EXECUTOR_RESULT_SIGNATURE, executor_result_id));
+        return Status::OK();
+    }
+
+private:
+    jclass executor_cl;
+    jobject executor_obj;
+    jmethodID executor_ctor_id;
+
+    jmethodID executor_add_id;
+    jmethodID executor_merge_id;
+    jmethodID executor_serialize_id;
+    jmethodID executor_result_id;
+    jmethodID executor_close_id;
+
+    std::unique_ptr<int64_t[]> input_values_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_nulls_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_offsets_ptrs;
+    std::unique_ptr<int64_t> output_value_buffer;
+    std::unique_ptr<int64_t> output_null_value;
+    std::unique_ptr<int64_t> output_offsets_ptr;
+    std::unique_ptr<int64_t> output_intermediate_state_ptr;
+
+    bool first_init;
+    int argument_size = 0;
+    std::string serialize_data;
+};
+
+class AggregateJavaUdaf final
+        : public IAggregateFunctionDataHelper<AggregateJavaUdafData, AggregateJavaUdaf> {
+public:
+    AggregateJavaUdaf(const TFunction& fn, const DataTypes& argument_types, const Array& parameters,
+                      const DataTypePtr& return_type)
+            : IAggregateFunctionDataHelper(argument_types, parameters),
+              _fn(fn),
+              _return_type(return_type) {}
+    ~AggregateJavaUdaf() = default;
+
+    static AggregateFunctionPtr create(const TFunction& fn, const DataTypes& argument_types,
+                                       const Array& parameters, const DataTypePtr& return_type) {
+        return std::make_shared<AggregateJavaUdaf>(fn, argument_types, parameters, return_type);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override {
+        new (place) Data(argument_types.size());
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(_fn));
+    }
+
+    String get_name() const override { return _fn.name.function_name; }
+
+    DataTypePtr get_return_type() const override { return _return_type; }
+
+    // TODO: here calling add operator maybe only hava done one row, this performance may be poorly
+    // so it's possible to maintain a hashtable in FE, the key is place address, value is the object
+    // then we can calling add_bacth function and calculate the whole batch at once,
+    // and avoid calling jni multiple times.

Review Comment:
   Padding a space at the beginning for each commented line below `TODO`



##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "()[B";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+// Calling Java method about those signture means: "(argument-types)return-type"
+// https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jbyteArray arr = env->NewByteArray(len);
+        env->SetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, arr);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        // TODO: Here get a byte[] from FE serialize, and then allocate the same length bytes to
+        // save it in BE, Because i'm not sure there is a way to use the byte[] not allocate again.
+        jbyteArray arr = (jbyteArray)(env->CallNonvirtualObjectMethod(executor_obj, executor_cl,
+                                                                      executor_serialize_id));
+        int len = env->GetArrayLength(arr);
+        serialize_data.resize(len);
+        env->GetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        write_binary(serialize_data, buf);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);               \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);      \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+private:
+    Status register_func_id(JNIEnv* env) {
+        auto register_id = [&](const char* func_name, const char* func_sign, jmethodID& func_id) {
+            func_id = env->GetMethodID(executor_cl, func_name, func_sign);
+            Status s = JniUtil::GetJniExceptionMsg(env);
+            if (!s.ok()) {
+                return Status::InternalError(
+                        strings::Substitute("Java-Udaf register_func_id meet error and error is $0",
+                                            s.get_error_msg()));
+            }
+            return s;
+        };
+
+        RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
+        RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+        RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
+        RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
+        RETURN_IF_ERROR(
+                register_id("serialize", UDAF_EXECUTOR_SERIALIZE_SIGNATURE, executor_serialize_id));
+        RETURN_IF_ERROR(
+                register_id("getValue", UDAF_EXECUTOR_RESULT_SIGNATURE, executor_result_id));
+        return Status::OK();
+    }
+
+private:
+    jclass executor_cl;
+    jobject executor_obj;
+    jmethodID executor_ctor_id;
+
+    jmethodID executor_add_id;
+    jmethodID executor_merge_id;
+    jmethodID executor_serialize_id;
+    jmethodID executor_result_id;
+    jmethodID executor_close_id;
+
+    std::unique_ptr<int64_t[]> input_values_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_nulls_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_offsets_ptrs;
+    std::unique_ptr<int64_t> output_value_buffer;
+    std::unique_ptr<int64_t> output_null_value;
+    std::unique_ptr<int64_t> output_offsets_ptr;
+    std::unique_ptr<int64_t> output_intermediate_state_ptr;
+
+    bool first_init;
+    int argument_size = 0;
+    std::string serialize_data;
+};
+
+class AggregateJavaUdaf final
+        : public IAggregateFunctionDataHelper<AggregateJavaUdafData, AggregateJavaUdaf> {
+public:
+    AggregateJavaUdaf(const TFunction& fn, const DataTypes& argument_types, const Array& parameters,
+                      const DataTypePtr& return_type)
+            : IAggregateFunctionDataHelper(argument_types, parameters),
+              _fn(fn),
+              _return_type(return_type) {}
+    ~AggregateJavaUdaf() = default;
+
+    static AggregateFunctionPtr create(const TFunction& fn, const DataTypes& argument_types,
+                                       const Array& parameters, const DataTypePtr& return_type) {
+        return std::make_shared<AggregateJavaUdaf>(fn, argument_types, parameters, return_type);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override {
+        new (place) Data(argument_types.size());
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(_fn));
+    }
+
+    String get_name() const override { return _fn.name.function_name; }
+
+    DataTypePtr get_return_type() const override { return _return_type; }
+
+    // TODO: here calling add operator maybe only hava done one row, this performance may be poorly
+    // so it's possible to maintain a hashtable in FE, the key is place address, value is the object
+    // then we can calling add_bacth function and calculate the whole batch at once,
+    // and avoid calling jni multiple times.
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num,
+             Arena*) const override {
+        this->data(place).add(columns, row_num, row_num + 1, argument_types);
+    }
+
+    // TODO: Here we calling method by jni, And if we get a thrown from FE,
+    // But can't let user known the error, only return directly and output error to log file.
+    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
+                                Arena* arena) const override {
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(_fn));

Review Comment:
   Is it necessary to `init_udaf` for each time to call `add_batch_single_place`? Seems you have called it in `create`.



##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "()[B";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+// Calling Java method about those signture means: "(argument-types)return-type"
+// https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jbyteArray arr = env->NewByteArray(len);
+        env->SetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, arr);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        // TODO: Here get a byte[] from FE serialize, and then allocate the same length bytes to
+        // save it in BE, Because i'm not sure there is a way to use the byte[] not allocate again.
+        jbyteArray arr = (jbyteArray)(env->CallNonvirtualObjectMethod(executor_obj, executor_cl,
+                                                                      executor_serialize_id));
+        int len = env->GetArrayLength(arr);
+        serialize_data.resize(len);
+        env->GetByteArrayRegion(arr, 0, len, reinterpret_cast<jbyte*>(serialize_data.data()));
+        write_binary(serialize_data, buf);
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);               \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = JniUtil::IncreaseReservedBufferSize(increase_buffer_size);      \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return JniUtil::GetJniExceptionMsg(env);
+    }
+
+private:
+    Status register_func_id(JNIEnv* env) {
+        auto register_id = [&](const char* func_name, const char* func_sign, jmethodID& func_id) {
+            func_id = env->GetMethodID(executor_cl, func_name, func_sign);
+            Status s = JniUtil::GetJniExceptionMsg(env);
+            if (!s.ok()) {
+                return Status::InternalError(
+                        strings::Substitute("Java-Udaf register_func_id meet error and error is $0",
+                                            s.get_error_msg()));
+            }
+            return s;
+        };
+
+        RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
+        RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+        RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
+        RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
+        RETURN_IF_ERROR(
+                register_id("serialize", UDAF_EXECUTOR_SERIALIZE_SIGNATURE, executor_serialize_id));
+        RETURN_IF_ERROR(
+                register_id("getValue", UDAF_EXECUTOR_RESULT_SIGNATURE, executor_result_id));
+        return Status::OK();
+    }
+
+private:
+    jclass executor_cl;
+    jobject executor_obj;
+    jmethodID executor_ctor_id;
+
+    jmethodID executor_add_id;
+    jmethodID executor_merge_id;
+    jmethodID executor_serialize_id;
+    jmethodID executor_result_id;
+    jmethodID executor_close_id;
+
+    std::unique_ptr<int64_t[]> input_values_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_nulls_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_offsets_ptrs;
+    std::unique_ptr<int64_t> output_value_buffer;
+    std::unique_ptr<int64_t> output_null_value;
+    std::unique_ptr<int64_t> output_offsets_ptr;
+    std::unique_ptr<int64_t> output_intermediate_state_ptr;
+
+    bool first_init;
+    int argument_size = 0;
+    std::string serialize_data;
+};
+
+class AggregateJavaUdaf final
+        : public IAggregateFunctionDataHelper<AggregateJavaUdafData, AggregateJavaUdaf> {
+public:
+    AggregateJavaUdaf(const TFunction& fn, const DataTypes& argument_types, const Array& parameters,
+                      const DataTypePtr& return_type)
+            : IAggregateFunctionDataHelper(argument_types, parameters),
+              _fn(fn),
+              _return_type(return_type) {}
+    ~AggregateJavaUdaf() = default;
+
+    static AggregateFunctionPtr create(const TFunction& fn, const DataTypes& argument_types,
+                                       const Array& parameters, const DataTypePtr& return_type) {
+        return std::make_shared<AggregateJavaUdaf>(fn, argument_types, parameters, return_type);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override {
+        new (place) Data(argument_types.size());
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(_fn));
+    }
+
+    String get_name() const override { return _fn.name.function_name; }
+
+    DataTypePtr get_return_type() const override { return _return_type; }
+
+    // TODO: here calling add operator maybe only hava done one row, this performance may be poorly
+    // so it's possible to maintain a hashtable in FE, the key is place address, value is the object
+    // then we can calling add_bacth function and calculate the whole batch at once,
+    // and avoid calling jni multiple times.
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num,
+             Arena*) const override {
+        this->data(place).add(columns, row_num, row_num + 1, argument_types);
+    }
+
+    // TODO: Here we calling method by jni, And if we get a thrown from FE,
+    // But can't let user known the error, only return directly and output error to log file.
+    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
+                                Arena* arena) const override {
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(_fn));

Review Comment:
   use `RETURN_IF_ERROR`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r887636880


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,392 @@
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_CREATE_SIGNATURE = "()Ljava/lang/Object;";
+const char* UDAF_EXECUTOR_DESTORY_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(J)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_DESERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        batch_size_ptr.reset(new int32_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_close_id_);
+        Status s = JniUtil::GetJniExceptionMsg(env);
+        if (!s.ok()) {
+            LOG(WARNING) << "meet some error in destroy: " << s.get_error_msg();
+        }
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+            if (env == nullptr) {
+                return Status::InternalError("Failed to get/create JVM");
+            }
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl_));
+
+            Status ret_code = register_func_id(env);
+            if (!ret_code.ok()) {
+                LOG(WARNING) << "register_func_id has error : " << ret_code.get_error_msg();
+            }
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+                ctor_params.__set_batch_size_ptr((int64_t)batch_size_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    void add(const IColumn** columns, size_t row_num, const DataTypes& argument_types) const {
+        JNIEnv* env = nullptr;

Review Comment:
   we can wrap a function `Status get_env(JNIEnv* env, string when)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] zhangstar333 commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
zhangstar333 commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r893032637


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return Status::OK();
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        if (rhs.first_init) {
+            return Status::OK();
+        }
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, data);
+        return Status::OK();
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        jlong len = env->GetLongField(executor_obj, serialize_len_id);
+        serialize_data.resize(len);
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_serialize_id, data);
+        write_binary(serialize_data, buf);
+        return Status::OK();
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(to);                     \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = IncreaseReservedBufferSize(increase_buffer_size);               \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return Status::OK();
+    }
+
+    static const int32_t INITIAL_RESERVED_BUFFER_SIZE = 1024;
+    // TODO: we need a heuristic strategy to increase buffer size for variable-size output.
+    static inline int32_t IncreaseReservedBufferSize(int n) {
+        return INITIAL_RESERVED_BUFFER_SIZE << n;
+    }
+
+    Status register_func_id(JNIEnv* env) {
+        auto register_id = [&](const char* func_name, const char* func_sign, jmethodID& func_id) {
+            func_id = env->GetMethodID(executor_cl, func_name, func_sign);
+            Status s = JniUtil::GetJniExceptionMsg(env);
+            if (!s.ok()) {
+                return Status::InternalError(
+                        strings::Substitute("Java-Udaf register_func_id meet error and error is $0",
+                                            s.get_error_msg()));
+            }
+            return s;
+        };
+
+        RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
+        RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+        RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
+        RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
+        RETURN_IF_ERROR(
+                register_id("serialize", UDAF_EXECUTOR_SERIALIZE_SIGNATURE, executor_serialize_id));
+        RETURN_IF_ERROR(
+                register_id("getValue", UDAF_EXECUTOR_RESULT_SIGNATURE, executor_result_id));
+        serialize_len_id = env->GetFieldID(executor_cl, "serializeLength", "J");
+        RETURN_ERROR_IF_EXC(env);
+        return Status::OK();
+    }
+
+public:
+    jclass executor_cl;
+    jobject executor_obj;
+    jmethodID executor_ctor_id;
+
+    jmethodID executor_add_id;
+    jmethodID executor_merge_id;
+    jmethodID executor_serialize_id;
+    jmethodID executor_result_id;
+    jmethodID executor_close_id;
+    jfieldID serialize_len_id;
+
+    std::unique_ptr<int64_t[]> input_values_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_nulls_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_offsets_ptrs;
+    std::unique_ptr<int64_t> output_value_buffer;
+    std::unique_ptr<int64_t> output_null_value;
+    std::unique_ptr<int64_t> output_offsets_ptr;
+    std::unique_ptr<int64_t> output_intermediate_state_ptr;
+
+    bool first_init;
+    int argument_size = 0;
+    std::string serialize_data;
+};
+
+class AggregateJavaUdaf final
+        : public IAggregateFunctionDataHelper<AggregateJavaUdafData, AggregateJavaUdaf> {
+public:
+    AggregateJavaUdaf(const TFunction& fn, const DataTypes& argument_types, const Array& parameters,
+                      const DataTypePtr& return_type)
+            : IAggregateFunctionDataHelper(argument_types, parameters),
+              fn_(fn),
+              return_type_(return_type) {}
+    ~AggregateJavaUdaf() = default;
+
+    static AggregateFunctionPtr create(const TFunction& fn, const DataTypes& argument_types,
+                                       const Array& parameters, const DataTypePtr& return_type) {
+        return std::make_shared<AggregateJavaUdaf>(fn, argument_types, parameters, return_type);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override {
+        new (place) Data(argument_types.size());
+    }
+
+    String get_name() const override { return fn_.name.function_name; }
+
+    DataTypePtr get_return_type() const override { return return_type_; }
+
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num,
+             Arena*) const override {
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(fn_));

Review Comment:
   Have test in create function do init operator, but it's unable to get jvm 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Gabriel39 commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r890852003


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,363 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+const char* UDAF_EXECUTOR_SERIALIZE_LENGTH_SIGNATURE = "()J";
+//Calling Java method about those signture means: "(argument-types)return-type"
+//https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return Status::OK();
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, data);
+        return Status::OK();
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        jlong len =
+                env->CallNonvirtualLongMethod(executor_obj, executor_cl, executor_serialize_len_id);
+        serialize_data.resize(len);
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_serialize_id, data);
+        write_binary(serialize_data, buf);
+        return Status::OK();
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);               \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = IncreaseReservedBufferSize(increase_buffer_size);               \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return Status::OK();
+    }
+
+private:
+    static const int32_t INITIAL_RESERVED_BUFFER_SIZE = 1024;
+    // TODO: we need a heuristic strategy to increase buffer size for variable-size output.
+    static inline int32_t IncreaseReservedBufferSize(int n) {

Review Comment:
   Move this function to another common file and call it in UDAF and UDF



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r887645411


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,392 @@
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_CREATE_SIGNATURE = "()Ljava/lang/Object;";
+const char* UDAF_EXECUTOR_DESTORY_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(J)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_DESERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        batch_size_ptr.reset(new int32_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_close_id_);
+        Status s = JniUtil::GetJniExceptionMsg(env);
+        if (!s.ok()) {
+            LOG(WARNING) << "meet some error in destroy: " << s.get_error_msg();
+        }
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+            if (env == nullptr) {
+                return Status::InternalError("Failed to get/create JVM");
+            }
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl_));
+
+            Status ret_code = register_func_id(env);
+            if (!ret_code.ok()) {
+                LOG(WARNING) << "register_func_id has error : " << ret_code.get_error_msg();
+            }
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+                ctor_params.__set_batch_size_ptr((int64_t)batch_size_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    void add(const IColumn** columns, size_t row_num, const DataTypes& argument_types) const {
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when add: " << ret_code.get_error_msg();
+        }
+        *batch_size_ptr = columns[0]->size();
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                LOG(WARNING) << "Java UDAF doesn't support type: "
+                             << argument_types[arg_idx]->get_name() << " now !";
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_add_id_, row_num);
+    }
+
+    void merge(const AggregateJavaUdafData& rhs) {
+        if (rhs.first_init) {
+            return;
+        }
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when merge: " << ret_code.get_error_msg();
+        }
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_merge_id_, data);
+    }
+
+    void write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when write: " << ret_code.get_error_msg();
+        }
+        jlong len = env->GetLongField(executor_obj, serialize_len_id);
+        serialize_data.resize(len);
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_serialize_id_, data);
+        write_binary(serialize_data, buf);
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when read: " << ret_code.get_error_msg();
+        }
+        read_binary(serialize_data, buf);
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_deserialize_id_, data);
+    }
+
+    void get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when get value: " << ret_code.get_error_msg();
+        }
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                   \
+    if (data_col.is_column_string()) {                                                       \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(to);                \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars()); \
+        ColumnString::Offsets& offsets =                                                     \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                  \
+        int increase_buffer_size = 0;                                                        \
+        int32_t buffer_size = IncreaseReservedBufferSize(increase_buffer_size);              \
+        chars.reserve(chars.size() + buffer_size);                                           \
+        chars.resize(chars.size() + buffer_size);                                            \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                      \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                     \
+        *output_intermediate_state_ptr = buffer_size;                                        \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl_,          \
+                                                        executor_result_id_, to.size() - 1); \
+        while (res != JNI_TRUE) {                                                            \
+            increase_buffer_size++;                                                          \
+            int32_t buffer_size = IncreaseReservedBufferSize(increase_buffer_size);          \
+            chars.resize(chars.size() + buffer_size);                                        \
+            *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                  \
+            *output_intermediate_state_ptr = buffer_size;                                    \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl_,               \
+                                                   executor_result_id_, to.size() - 1);      \
+        }                                                                                    \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                      \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);      \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl_, executor_result_id_,    \
+                                         to.size() - 1);                                     \
+    } else {                                                                                 \
+        LOG(WARNING) << "Java UDAF doesn't support return type: " << result_type->get_name() \
+                     << "$0 now !";                                                          \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl_, executor_result_id_,
+                                             to.size() - 1);
+        }
+    }
+
+    static const int32_t INITIAL_RESERVED_BUFFER_SIZE = 1024;
+    // TODO: we need a heuristic strategy to increase buffer size for variable-size output.
+    static inline int32_t IncreaseReservedBufferSize(int n) {
+        return INITIAL_RESERVED_BUFFER_SIZE << n;
+    }
+
+    Status register_func_id(JNIEnv* env) {
+        auto register_id = [&](const char* func_name, const char* func_sign, jmethodID& func_id) {
+            func_id = env->GetMethodID(executor_cl_, func_name, func_sign);
+            Status s = JniUtil::GetJniExceptionMsg(env);
+            if (!s.ok()) {
+                LOG(WARNING) << "when register_func_id get error: " << s.get_error_msg();
+            }
+            return s;
+        };
+
+        RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id_));
+        RETURN_IF_ERROR(register_id("create", UDAF_EXECUTOR_CREATE_SIGNATURE, executor_create_id_));
+        RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id_));
+        RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id_));
+        RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id_));
+        RETURN_IF_ERROR(register_id("serialize", UDAF_EXECUTOR_SERIALIZE_SIGNATURE,
+                                    executor_serialize_id_));
+        RETURN_IF_ERROR(register_id("deserialize", UDAF_EXECUTOR_DESERIALIZE_SIGNATURE,
+                                    executor_deserialize_id_));
+        RETURN_IF_ERROR(
+                register_id("getValue", UDAF_EXECUTOR_RESULT_SIGNATURE, executor_result_id_));
+        serialize_len_id = env->GetFieldID(executor_cl_, "serializeLength", "J");
+        RETURN_ERROR_IF_EXC(env);
+        return Status::OK();
+    }
+
+public:
+    jclass executor_cl_;
+    jobject executor_obj;
+    jmethodID executor_ctor_id_;
+
+    jmethodID executor_create_id_;
+    jmethodID executor_add_id_;
+    jmethodID executor_merge_id_;
+    jmethodID executor_serialize_id_;
+    jmethodID executor_deserialize_id_;
+    jmethodID executor_result_id_;
+    jmethodID executor_close_id_;
+    jfieldID serialize_len_id;
+
+    std::unique_ptr<int64_t[]> input_values_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_nulls_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_offsets_ptrs;
+    std::unique_ptr<int64_t> output_value_buffer;
+    std::unique_ptr<int64_t> output_null_value;
+    std::unique_ptr<int64_t> output_offsets_ptr;
+    std::unique_ptr<int64_t> output_intermediate_state_ptr;
+    std::unique_ptr<int32_t> batch_size_ptr;
+
+    bool first_init;
+    int argument_size = 0;
+    std::string serialize_data;
+};
+
+class AggregateJavaUdaf final
+        : public IAggregateFunctionDataHelper<AggregateJavaUdafData, AggregateJavaUdaf> {
+public:
+    AggregateJavaUdaf(const TFunction& fn, const DataTypes& argument_types, const Array& parameters,
+                      const DataTypePtr& return_type)
+            : IAggregateFunctionDataHelper(argument_types, parameters),
+              fn_(fn),
+              return_type_(return_type) {}
+    ~AggregateJavaUdaf() = default;
+
+    static AggregateFunctionPtr create(const TFunction& fn, const DataTypes& argument_types,
+                                       const Array& parameters, const DataTypePtr& return_type) {
+        return std::make_shared<AggregateJavaUdaf>(fn, argument_types, parameters, return_type);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override {
+        new (place) Data(argument_types.size());
+    }
+
+    String get_name() const override { return fn_.name.function_name; }
+
+    DataTypePtr get_return_type() const override { return return_type_; }
+
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num,
+             Arena*) const override {
+        Status status = data(place).init_udaf(fn_);
+        if (!status.ok()) {
+            LOG(WARNING) << " in add pressore  of init udaf has error: " << status.get_error_msg();

Review Comment:
   multiple consecutive spaces



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r889465925


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return Status::OK();
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        if (rhs.first_init) {
+            return Status::OK();
+        }
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, data);
+        return Status::OK();
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        jlong len = env->GetLongField(executor_obj, serialize_len_id);
+        serialize_data.resize(len);
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_serialize_id, data);
+        write_binary(serialize_data, buf);
+        return Status::OK();
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(to);                     \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = IncreaseReservedBufferSize(increase_buffer_size);               \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return Status::OK();
+    }
+
+    static const int32_t INITIAL_RESERVED_BUFFER_SIZE = 1024;
+    // TODO: we need a heuristic strategy to increase buffer size for variable-size output.
+    static inline int32_t IncreaseReservedBufferSize(int n) {
+        return INITIAL_RESERVED_BUFFER_SIZE << n;
+    }
+
+    Status register_func_id(JNIEnv* env) {
+        auto register_id = [&](const char* func_name, const char* func_sign, jmethodID& func_id) {
+            func_id = env->GetMethodID(executor_cl, func_name, func_sign);
+            Status s = JniUtil::GetJniExceptionMsg(env);
+            if (!s.ok()) {
+                return Status::InternalError(
+                        strings::Substitute("Java-Udaf register_func_id meet error and error is $0",
+                                            s.get_error_msg()));
+            }
+            return s;
+        };
+
+        RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
+        RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+        RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
+        RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
+        RETURN_IF_ERROR(
+                register_id("serialize", UDAF_EXECUTOR_SERIALIZE_SIGNATURE, executor_serialize_id));
+        RETURN_IF_ERROR(
+                register_id("getValue", UDAF_EXECUTOR_RESULT_SIGNATURE, executor_result_id));
+        serialize_len_id = env->GetFieldID(executor_cl, "serializeLength", "J");
+        RETURN_ERROR_IF_EXC(env);
+        return Status::OK();
+    }
+
+public:
+    jclass executor_cl;
+    jobject executor_obj;
+    jmethodID executor_ctor_id;
+
+    jmethodID executor_add_id;
+    jmethodID executor_merge_id;
+    jmethodID executor_serialize_id;
+    jmethodID executor_result_id;
+    jmethodID executor_close_id;
+    jfieldID serialize_len_id;
+
+    std::unique_ptr<int64_t[]> input_values_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_nulls_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_offsets_ptrs;
+    std::unique_ptr<int64_t> output_value_buffer;
+    std::unique_ptr<int64_t> output_null_value;
+    std::unique_ptr<int64_t> output_offsets_ptr;
+    std::unique_ptr<int64_t> output_intermediate_state_ptr;
+
+    bool first_init;
+    int argument_size = 0;
+    std::string serialize_data;
+};
+
+class AggregateJavaUdaf final
+        : public IAggregateFunctionDataHelper<AggregateJavaUdafData, AggregateJavaUdaf> {
+public:
+    AggregateJavaUdaf(const TFunction& fn, const DataTypes& argument_types, const Array& parameters,
+                      const DataTypePtr& return_type)
+            : IAggregateFunctionDataHelper(argument_types, parameters),
+              fn_(fn),
+              return_type_(return_type) {}
+    ~AggregateJavaUdaf() = default;
+
+    static AggregateFunctionPtr create(const TFunction& fn, const DataTypes& argument_types,
+                                       const Array& parameters, const DataTypePtr& return_type) {
+        return std::make_shared<AggregateJavaUdaf>(fn, argument_types, parameters, return_type);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override {
+        new (place) Data(argument_types.size());
+    }
+
+    String get_name() const override { return fn_.name.function_name; }
+
+    DataTypePtr get_return_type() const override { return return_type_; }
+
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num,
+             Arena*) const override {
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(fn_));
+        this->data(place).add(columns, row_num, row_num + 1, argument_types);
+    }
+
+    void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
+                                Arena* arena) const override {
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(fn_));
+        this->data(place).add(columns, 0, batch_size, argument_types);
+    }
+
+    void reset(AggregateDataPtr place) const override {}
+
+    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
+               Arena*) const override {
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(fn_));
+        this->data(place).merge(this->data(rhs));
+    }
+
+    void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override {
+        this->data(const_cast<AggregateDataPtr&>(place)).write(buf);
+    }
+
+    void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
+                     Arena*) const override {
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(fn_));
+        this->data(place).read(buf);
+    }
+
+    void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override {
+        this->data(place).get(to, return_type_);
+    }
+
+private:
+    TFunction fn_;

Review Comment:
   `_fn`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r889645783


##########
fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java:
##########
@@ -0,0 +1,539 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import org.apache.doris.catalog.Type;
+import org.apache.doris.thrift.TJavaUdfExecutorCtorParams;
+import org.apache.doris.udf.UdfExecutor.JavaUdfDataType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+/**
+ * udaf executor.
+ */
+public class UdafExecutor {
+    public static final String UDAF_CREATE_FUNCTION = "create";
+    public static final String UDAF_DESTORY_FUNCTION = "destroy";
+    public static final String UDAF_ADD_FUNCTION = "add";
+    public static final String UDAF_SERIALIZE_FUNCTION = "serialize";
+    public static final String UDAF_MERGE_FUNCTION = "merge";
+    public static final String UDAF_RESULT_FUNCTION = "getValue";
+    private static final Logger LOG = Logger.getLogger(UdfExecutor.class);
+    private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
+    private final long inputBufferPtrs;
+    private final long inputNullsPtrs;
+    private final long inputOffsetsPtrs;
+    private final long outputBufferPtr;
+    private final long outputNullPtr;
+    private final long outputOffsetsPtr;
+    private final long outputIntermediateStatePtr;
+    private Object udaf;
+    private HashMap<String, Method> allMethods;
+    private URLClassLoader classLoader;
+    private JavaUdfDataType[] argTypes;
+    private JavaUdfDataType retType;
+    private Object[] inputObjects;
+    private Object[] inputArgs;
+
+    private Object stateObj;
+    private long serializeLength;
+
+    /**
+     * Constructor to create an object.
+     */
+    public UdafExecutor(byte[] thriftParams) throws Exception {
+        TJavaUdfExecutorCtorParams request = new TJavaUdfExecutorCtorParams();
+        TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY);
+        try {
+            deserializer.deserialize(request, thriftParams);
+        } catch (TException e) {
+            throw new InternalException(e.getMessage());
+        }
+        Type[] parameterTypes = new Type[request.fn.arg_types.size()];
+        for (int i = 0; i < request.fn.arg_types.size(); ++i) {
+            parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i));
+        }
+        inputBufferPtrs = request.input_buffer_ptrs;
+        inputNullsPtrs = request.input_nulls_ptrs;
+        inputOffsetsPtrs = request.input_offsets_ptrs;
+
+        outputBufferPtr = request.output_buffer_ptr;
+        outputNullPtr = request.output_null_ptr;
+        outputOffsetsPtr = request.output_offsets_ptr;
+        outputIntermediateStatePtr = request.output_intermediate_state_ptr;
+        allMethods = new HashMap<>();
+        String className = request.fn.aggregate_fn.symbol;
+        String jarFile = request.location;
+        Type retType = UdfUtils.fromThrift(request.fn.ret_type, 0).first;
+        init(jarFile, className, retType, parameterTypes);
+        stateObj = create();
+    }
+
+    /**
+     * close and invoke destroy function.
+     */
+    public void close() {
+        if (classLoader != null) {
+            try {
+                destroy();
+                classLoader.close();
+            } catch (Exception e) {
+                // Log and ignore.
+                LOG.debug("Error closing the URLClassloader.", e);
+            }
+        }
+        // We are now un-usable (because the class loader has been
+        // closed), so null out allMethods and classLoader.
+        allMethods = null;
+        classLoader = null;
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        close();
+        super.finalize();
+    }
+
+    /**
+     * invoke add function, add row in loop [rowStart, rowEnd).
+     */
+    public void add(long rowStart, long rowEnd) throws UdfRuntimeException {
+        try {
+            inputArgs = new Object[argTypes.length + 1];
+            for (long row = rowStart; row < rowEnd; ++row) {
+                allocateInputObjects(row);
+                for (int i = 0; i < argTypes.length; ++i) {
+                    if (UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputNullsPtrs, i)) == -1
+                            || UdfUtils.UNSAFE.getByte(null,
+                                    UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputNullsPtrs, i)) + row)
+                            == 0) {
+                        inputArgs[i + 1] = inputObjects[i];
+                    } else {
+                        inputArgs[i + 1] = null;
+                    }
+                }
+                inputArgs[0] = stateObj;

Review Comment:
   only need set one time?



##########
fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java:
##########
@@ -0,0 +1,539 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import org.apache.doris.catalog.Type;
+import org.apache.doris.thrift.TJavaUdfExecutorCtorParams;
+import org.apache.doris.udf.UdfExecutor.JavaUdfDataType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+/**
+ * udaf executor.
+ */
+public class UdafExecutor {
+    public static final String UDAF_CREATE_FUNCTION = "create";
+    public static final String UDAF_DESTORY_FUNCTION = "destroy";
+    public static final String UDAF_ADD_FUNCTION = "add";
+    public static final String UDAF_SERIALIZE_FUNCTION = "serialize";
+    public static final String UDAF_MERGE_FUNCTION = "merge";
+    public static final String UDAF_RESULT_FUNCTION = "getValue";
+    private static final Logger LOG = Logger.getLogger(UdfExecutor.class);
+    private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
+    private final long inputBufferPtrs;
+    private final long inputNullsPtrs;
+    private final long inputOffsetsPtrs;
+    private final long outputBufferPtr;
+    private final long outputNullPtr;
+    private final long outputOffsetsPtr;
+    private final long outputIntermediateStatePtr;
+    private Object udaf;
+    private HashMap<String, Method> allMethods;
+    private URLClassLoader classLoader;
+    private JavaUdfDataType[] argTypes;
+    private JavaUdfDataType retType;
+    private Object[] inputObjects;
+    private Object[] inputArgs;
+
+    private Object stateObj;
+    private long serializeLength;
+
+    /**
+     * Constructor to create an object.
+     */
+    public UdafExecutor(byte[] thriftParams) throws Exception {
+        TJavaUdfExecutorCtorParams request = new TJavaUdfExecutorCtorParams();
+        TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY);
+        try {
+            deserializer.deserialize(request, thriftParams);
+        } catch (TException e) {
+            throw new InternalException(e.getMessage());
+        }
+        Type[] parameterTypes = new Type[request.fn.arg_types.size()];
+        for (int i = 0; i < request.fn.arg_types.size(); ++i) {
+            parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i));
+        }
+        inputBufferPtrs = request.input_buffer_ptrs;
+        inputNullsPtrs = request.input_nulls_ptrs;
+        inputOffsetsPtrs = request.input_offsets_ptrs;
+
+        outputBufferPtr = request.output_buffer_ptr;
+        outputNullPtr = request.output_null_ptr;
+        outputOffsetsPtr = request.output_offsets_ptr;
+        outputIntermediateStatePtr = request.output_intermediate_state_ptr;
+        allMethods = new HashMap<>();
+        String className = request.fn.aggregate_fn.symbol;
+        String jarFile = request.location;
+        Type retType = UdfUtils.fromThrift(request.fn.ret_type, 0).first;
+        init(jarFile, className, retType, parameterTypes);
+        stateObj = create();
+    }
+
+    /**
+     * close and invoke destroy function.
+     */
+    public void close() {
+        if (classLoader != null) {
+            try {
+                destroy();
+                classLoader.close();
+            } catch (Exception e) {
+                // Log and ignore.
+                LOG.debug("Error closing the URLClassloader.", e);
+            }
+        }
+        // We are now un-usable (because the class loader has been
+        // closed), so null out allMethods and classLoader.
+        allMethods = null;
+        classLoader = null;
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        close();
+        super.finalize();
+    }
+
+    /**
+     * invoke add function, add row in loop [rowStart, rowEnd).
+     */
+    public void add(long rowStart, long rowEnd) throws UdfRuntimeException {
+        try {
+            inputArgs = new Object[argTypes.length + 1];
+            for (long row = rowStart; row < rowEnd; ++row) {
+                allocateInputObjects(row);
+                for (int i = 0; i < argTypes.length; ++i) {
+                    if (UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputNullsPtrs, i)) == -1
+                            || UdfUtils.UNSAFE.getByte(null,
+                                    UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputNullsPtrs, i)) + row)
+                            == 0) {
+                        inputArgs[i + 1] = inputObjects[i];
+                    } else {
+                        inputArgs[i + 1] = null;
+                    }
+                }
+                inputArgs[0] = stateObj;
+                allMethods.get(UDAF_ADD_FUNCTION).invoke(udaf, inputArgs);
+            }
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to add: ", e);
+        }
+    }
+
+    /**
+     * invoke user create function to get obj.
+     */
+    public Object create() throws UdfRuntimeException {
+        try {
+            return allMethods.get(UDAF_CREATE_FUNCTION).invoke(udaf, null);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to create: ", e);
+        }
+    }
+
+    /**
+     * invoke destroy before colse.
+     */
+    public void destroy() throws UdfRuntimeException {
+        try {
+            allMethods.get(UDAF_DESTORY_FUNCTION).invoke(udaf, stateObj);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to destroy: ", e);
+        }
+    }
+
+    /**
+     * invoke serialize function into buf.
+     */
+    public void serialize(Object buf) throws UdfRuntimeException {
+        try {
+            Object[] args = new Object[2];
+            args[0] = stateObj;
+            args[1] = (ByteBuffer) buf;
+            allMethods.get(UDAF_SERIALIZE_FUNCTION).invoke(udaf, args);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to serialize: ", e);
+        }
+    }
+
+    /**
+     * invoke merge function and it's have done deserialze.
+     */
+    public void merge(Object buf) throws UdfRuntimeException {
+        try {
+            Object[] args = new Object[2];
+            args[0] = stateObj;
+            args[1] = (ByteBuffer) buf;
+            allMethods.get(UDAF_MERGE_FUNCTION).invoke(udaf, args);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to merge: ", e);
+        }
+    }
+
+    /**
+     * invoke getValue to return finally result.
+     */
+    public boolean getValue(long row) throws UdfRuntimeException {
+        try {
+            return storeUdfResult(allMethods.get(UDAF_RESULT_FUNCTION).invoke(udaf, stateObj), row);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to result", e);
+        }
+    }
+
+    private boolean storeUdfResult(Object obj, long row) throws UdfRuntimeException {
+        if (obj == null) {
+            //if result is null, because we have insert default before, so return true directly
+            return true;
+        }
+        if (UdfUtils.UNSAFE.getLong(null, outputNullPtr) != -1) {
+            UdfUtils.UNSAFE.putByte(UdfUtils.UNSAFE.getLong(null, outputNullPtr) + row, (byte) 0);
+        }
+        switch (retType) {
+            case BOOLEAN: {
+                boolean val = (boolean) obj;
+                UdfUtils.UNSAFE.putByte(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        val ? (byte) 1 : 0);
+                return true;
+            }
+            case TINYINT: {
+                UdfUtils.UNSAFE.putByte(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (byte) obj);
+                return true;
+            }
+            case SMALLINT: {
+                UdfUtils.UNSAFE.putShort(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (short) obj);
+                return true;
+            }
+            case INT: {
+                UdfUtils.UNSAFE.putInt(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (int) obj);
+                return true;
+            }
+            case BIGINT: {
+                UdfUtils.UNSAFE.putLong(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (long) obj);
+                return true;
+            }
+            case FLOAT: {
+                UdfUtils.UNSAFE.putFloat(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (float) obj);
+                return true;
+            }
+            case DOUBLE: {
+                UdfUtils.UNSAFE.putDouble(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (double) obj);
+                return true;
+            }
+            case DATE: {
+                LocalDate date = (LocalDate) obj;
+                long time =
+                        UdfExecutor.convertDateTimeToLong(date.getYear(), date.getMonthValue(), date.getDayOfMonth(), 0,
+                                0, 0, true);
+                UdfUtils.UNSAFE.putLong(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(), time);
+                return true;
+            }
+            case DATETIME: {
+                LocalDateTime date = (LocalDateTime) obj;
+                long time =
+                        UdfExecutor.convertDateTimeToLong(date.getYear(), date.getMonthValue(), date.getDayOfMonth(),
+                                date.getHour(), date.getMinute(), date.getSecond(), false);
+                UdfUtils.UNSAFE.putLong(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(), time);
+                return true;
+            }
+            case LARGEINT: {
+                BigInteger data = (BigInteger) obj;
+                byte[] bytes = UdfExecutor.convertByteOrder(data.toByteArray());
+
+                //here value is 16 bytes, so if result data greater than the maximum of 16 bytes
+                //it will return a wrong num to backend;
+                byte[] value = new byte[16];
+                //check data is negative
+                if (data.signum() == -1) {
+                    Arrays.fill(value, (byte) -1);
+                }
+                for (int index = 0; index < Math.min(bytes.length, value.length); ++index) {
+                    value[index] = bytes[index];
+                }
+
+                UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null,
+                        UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(), value.length);
+                return true;
+            }
+            case DECIMALV2: {
+                BigInteger data = ((BigDecimal) obj).unscaledValue();
+                byte[] bytes = UdfExecutor.convertByteOrder(data.toByteArray());
+                //TODO: here is maybe overflow also, and may find a better way to handle
+                byte[] value = new byte[16];
+                if (data.signum() == -1) {
+                    Arrays.fill(value, (byte) -1);
+                }
+
+                for (int index = 0; index < Math.min(bytes.length, value.length); ++index) {
+                    value[index] = bytes[index];
+                }
+
+                UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null,
+                        UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(), value.length);
+                return true;
+            }
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                long bufferSize = UdfUtils.UNSAFE.getLong(null, outputIntermediateStatePtr);
+                byte[] bytes = ((String) obj).getBytes(StandardCharsets.UTF_8);
+
+                long offset = Integer.toUnsignedLong(
+                        UdfUtils.UNSAFE.getInt(null, UdfUtils.UNSAFE.getLong(null, outputOffsetsPtr) + 4L * row));
+                if (offset + bytes.length > bufferSize) {
+                    return false;
+                }
+                offset += bytes.length;
+                UdfUtils.UNSAFE.putChar(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + offset - 1,
+                        UdfUtils.END_OF_STRING);
+                UdfUtils.UNSAFE.putInt(null, UdfUtils.UNSAFE.getLong(null, outputOffsetsPtr) + 4L * row,
+                        Integer.parseUnsignedInt(String.valueOf(offset)));
+                UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null,
+                        UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + offset - bytes.length - 1, bytes.length);
+                return true;
+            default:
+                throw new UdfRuntimeException("Unsupported return type: " + retType);
+        }
+    }
+
+    private void allocateInputObjects(long row) throws UdfRuntimeException {
+        inputObjects = new Object[argTypes.length];

Review Comment:
   why not return inputObjects?



##########
fe/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java:
##########
@@ -0,0 +1,539 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import org.apache.doris.catalog.Type;
+import org.apache.doris.thrift.TJavaUdfExecutorCtorParams;
+import org.apache.doris.udf.UdfExecutor.JavaUdfDataType;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+/**
+ * udaf executor.
+ */
+public class UdafExecutor {
+    public static final String UDAF_CREATE_FUNCTION = "create";
+    public static final String UDAF_DESTORY_FUNCTION = "destroy";
+    public static final String UDAF_ADD_FUNCTION = "add";
+    public static final String UDAF_SERIALIZE_FUNCTION = "serialize";
+    public static final String UDAF_MERGE_FUNCTION = "merge";
+    public static final String UDAF_RESULT_FUNCTION = "getValue";
+    private static final Logger LOG = Logger.getLogger(UdfExecutor.class);
+    private static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
+    private final long inputBufferPtrs;
+    private final long inputNullsPtrs;
+    private final long inputOffsetsPtrs;
+    private final long outputBufferPtr;
+    private final long outputNullPtr;
+    private final long outputOffsetsPtr;
+    private final long outputIntermediateStatePtr;
+    private Object udaf;
+    private HashMap<String, Method> allMethods;
+    private URLClassLoader classLoader;
+    private JavaUdfDataType[] argTypes;
+    private JavaUdfDataType retType;
+    private Object[] inputObjects;
+    private Object[] inputArgs;
+
+    private Object stateObj;
+    private long serializeLength;
+
+    /**
+     * Constructor to create an object.
+     */
+    public UdafExecutor(byte[] thriftParams) throws Exception {
+        TJavaUdfExecutorCtorParams request = new TJavaUdfExecutorCtorParams();
+        TDeserializer deserializer = new TDeserializer(PROTOCOL_FACTORY);
+        try {
+            deserializer.deserialize(request, thriftParams);
+        } catch (TException e) {
+            throw new InternalException(e.getMessage());
+        }
+        Type[] parameterTypes = new Type[request.fn.arg_types.size()];
+        for (int i = 0; i < request.fn.arg_types.size(); ++i) {
+            parameterTypes[i] = Type.fromThrift(request.fn.arg_types.get(i));
+        }
+        inputBufferPtrs = request.input_buffer_ptrs;
+        inputNullsPtrs = request.input_nulls_ptrs;
+        inputOffsetsPtrs = request.input_offsets_ptrs;
+
+        outputBufferPtr = request.output_buffer_ptr;
+        outputNullPtr = request.output_null_ptr;
+        outputOffsetsPtr = request.output_offsets_ptr;
+        outputIntermediateStatePtr = request.output_intermediate_state_ptr;
+        allMethods = new HashMap<>();
+        String className = request.fn.aggregate_fn.symbol;
+        String jarFile = request.location;
+        Type retType = UdfUtils.fromThrift(request.fn.ret_type, 0).first;
+        init(jarFile, className, retType, parameterTypes);
+        stateObj = create();
+    }
+
+    /**
+     * close and invoke destroy function.
+     */
+    public void close() {
+        if (classLoader != null) {
+            try {
+                destroy();
+                classLoader.close();
+            } catch (Exception e) {
+                // Log and ignore.
+                LOG.debug("Error closing the URLClassloader.", e);
+            }
+        }
+        // We are now un-usable (because the class loader has been
+        // closed), so null out allMethods and classLoader.
+        allMethods = null;
+        classLoader = null;
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        close();
+        super.finalize();
+    }
+
+    /**
+     * invoke add function, add row in loop [rowStart, rowEnd).
+     */
+    public void add(long rowStart, long rowEnd) throws UdfRuntimeException {
+        try {
+            inputArgs = new Object[argTypes.length + 1];
+            for (long row = rowStart; row < rowEnd; ++row) {
+                allocateInputObjects(row);
+                for (int i = 0; i < argTypes.length; ++i) {
+                    if (UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputNullsPtrs, i)) == -1
+                            || UdfUtils.UNSAFE.getByte(null,
+                                    UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputNullsPtrs, i)) + row)
+                            == 0) {
+                        inputArgs[i + 1] = inputObjects[i];
+                    } else {
+                        inputArgs[i + 1] = null;
+                    }
+                }
+                inputArgs[0] = stateObj;
+                allMethods.get(UDAF_ADD_FUNCTION).invoke(udaf, inputArgs);
+            }
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to add: ", e);
+        }
+    }
+
+    /**
+     * invoke user create function to get obj.
+     */
+    public Object create() throws UdfRuntimeException {
+        try {
+            return allMethods.get(UDAF_CREATE_FUNCTION).invoke(udaf, null);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to create: ", e);
+        }
+    }
+
+    /**
+     * invoke destroy before colse.
+     */
+    public void destroy() throws UdfRuntimeException {
+        try {
+            allMethods.get(UDAF_DESTORY_FUNCTION).invoke(udaf, stateObj);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to destroy: ", e);
+        }
+    }
+
+    /**
+     * invoke serialize function into buf.
+     */
+    public void serialize(Object buf) throws UdfRuntimeException {
+        try {
+            Object[] args = new Object[2];
+            args[0] = stateObj;
+            args[1] = (ByteBuffer) buf;
+            allMethods.get(UDAF_SERIALIZE_FUNCTION).invoke(udaf, args);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to serialize: ", e);
+        }
+    }
+
+    /**
+     * invoke merge function and it's have done deserialze.
+     */
+    public void merge(Object buf) throws UdfRuntimeException {
+        try {
+            Object[] args = new Object[2];
+            args[0] = stateObj;
+            args[1] = (ByteBuffer) buf;
+            allMethods.get(UDAF_MERGE_FUNCTION).invoke(udaf, args);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to merge: ", e);
+        }
+    }
+
+    /**
+     * invoke getValue to return finally result.
+     */
+    public boolean getValue(long row) throws UdfRuntimeException {
+        try {
+            return storeUdfResult(allMethods.get(UDAF_RESULT_FUNCTION).invoke(udaf, stateObj), row);
+        } catch (Exception e) {
+            throw new UdfRuntimeException("UDAF failed to result", e);
+        }
+    }
+
+    private boolean storeUdfResult(Object obj, long row) throws UdfRuntimeException {
+        if (obj == null) {
+            //if result is null, because we have insert default before, so return true directly
+            return true;
+        }
+        if (UdfUtils.UNSAFE.getLong(null, outputNullPtr) != -1) {
+            UdfUtils.UNSAFE.putByte(UdfUtils.UNSAFE.getLong(null, outputNullPtr) + row, (byte) 0);
+        }
+        switch (retType) {
+            case BOOLEAN: {
+                boolean val = (boolean) obj;
+                UdfUtils.UNSAFE.putByte(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        val ? (byte) 1 : 0);
+                return true;
+            }
+            case TINYINT: {
+                UdfUtils.UNSAFE.putByte(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (byte) obj);
+                return true;
+            }
+            case SMALLINT: {
+                UdfUtils.UNSAFE.putShort(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (short) obj);
+                return true;
+            }
+            case INT: {
+                UdfUtils.UNSAFE.putInt(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (int) obj);
+                return true;
+            }
+            case BIGINT: {
+                UdfUtils.UNSAFE.putLong(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (long) obj);
+                return true;
+            }
+            case FLOAT: {
+                UdfUtils.UNSAFE.putFloat(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (float) obj);
+                return true;
+            }
+            case DOUBLE: {
+                UdfUtils.UNSAFE.putDouble(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(),
+                        (double) obj);
+                return true;
+            }
+            case DATE: {
+                LocalDate date = (LocalDate) obj;
+                long time =
+                        UdfExecutor.convertDateTimeToLong(date.getYear(), date.getMonthValue(), date.getDayOfMonth(), 0,
+                                0, 0, true);
+                UdfUtils.UNSAFE.putLong(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(), time);
+                return true;
+            }
+            case DATETIME: {
+                LocalDateTime date = (LocalDateTime) obj;
+                long time =
+                        UdfExecutor.convertDateTimeToLong(date.getYear(), date.getMonthValue(), date.getDayOfMonth(),
+                                date.getHour(), date.getMinute(), date.getSecond(), false);
+                UdfUtils.UNSAFE.putLong(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(), time);
+                return true;
+            }
+            case LARGEINT: {
+                BigInteger data = (BigInteger) obj;
+                byte[] bytes = UdfExecutor.convertByteOrder(data.toByteArray());
+
+                //here value is 16 bytes, so if result data greater than the maximum of 16 bytes
+                //it will return a wrong num to backend;
+                byte[] value = new byte[16];
+                //check data is negative
+                if (data.signum() == -1) {
+                    Arrays.fill(value, (byte) -1);
+                }
+                for (int index = 0; index < Math.min(bytes.length, value.length); ++index) {
+                    value[index] = bytes[index];
+                }
+
+                UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null,
+                        UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(), value.length);
+                return true;
+            }
+            case DECIMALV2: {
+                BigInteger data = ((BigDecimal) obj).unscaledValue();
+                byte[] bytes = UdfExecutor.convertByteOrder(data.toByteArray());
+                //TODO: here is maybe overflow also, and may find a better way to handle
+                byte[] value = new byte[16];
+                if (data.signum() == -1) {
+                    Arrays.fill(value, (byte) -1);
+                }
+
+                for (int index = 0; index < Math.min(bytes.length, value.length); ++index) {
+                    value[index] = bytes[index];
+                }
+
+                UdfUtils.copyMemory(value, UdfUtils.BYTE_ARRAY_OFFSET, null,
+                        UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + row * retType.getLen(), value.length);
+                return true;
+            }
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                long bufferSize = UdfUtils.UNSAFE.getLong(null, outputIntermediateStatePtr);
+                byte[] bytes = ((String) obj).getBytes(StandardCharsets.UTF_8);
+
+                long offset = Integer.toUnsignedLong(
+                        UdfUtils.UNSAFE.getInt(null, UdfUtils.UNSAFE.getLong(null, outputOffsetsPtr) + 4L * row));
+                if (offset + bytes.length > bufferSize) {
+                    return false;
+                }
+                offset += bytes.length;
+                UdfUtils.UNSAFE.putChar(UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + offset - 1,
+                        UdfUtils.END_OF_STRING);
+                UdfUtils.UNSAFE.putInt(null, UdfUtils.UNSAFE.getLong(null, outputOffsetsPtr) + 4L * row,
+                        Integer.parseUnsignedInt(String.valueOf(offset)));
+                UdfUtils.copyMemory(bytes, UdfUtils.BYTE_ARRAY_OFFSET, null,
+                        UdfUtils.UNSAFE.getLong(null, outputBufferPtr) + offset - bytes.length - 1, bytes.length);
+                return true;
+            default:
+                throw new UdfRuntimeException("Unsupported return type: " + retType);
+        }
+    }
+
+    private void allocateInputObjects(long row) throws UdfRuntimeException {
+        inputObjects = new Object[argTypes.length];
+
+        for (int i = 0; i < argTypes.length; ++i) {
+            switch (argTypes[i]) {
+                case BOOLEAN:
+                    inputObjects[i] = UdfUtils.UNSAFE.getBoolean(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + row);
+                    break;
+                case TINYINT:
+                    inputObjects[i] = UdfUtils.UNSAFE.getByte(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + row);
+                    break;
+                case SMALLINT:
+                    inputObjects[i] = UdfUtils.UNSAFE.getShort(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + 2L * row);
+                    break;
+                case INT:
+                    inputObjects[i] = UdfUtils.UNSAFE.getInt(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + 4L * row);
+                    break;
+                case BIGINT:
+                    inputObjects[i] = UdfUtils.UNSAFE.getLong(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + 8L * row);
+                    break;
+                case FLOAT:
+                    inputObjects[i] = UdfUtils.UNSAFE.getFloat(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + 4L * row);
+                    break;
+                case DOUBLE:
+                    inputObjects[i] = UdfUtils.UNSAFE.getDouble(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + 8L * row);
+                    break;
+                case DATE: {
+                    long data = UdfUtils.UNSAFE.getLong(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + 8L * row);
+                    inputObjects[i] = UdfExecutor.convertToDate(data);
+                    break;
+                }
+                case DATETIME: {
+                    long data = UdfUtils.UNSAFE.getLong(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + 8L * row);
+                    inputObjects[i] = UdfExecutor.convertToDateTime(data);
+                    break;
+                }
+                case LARGEINT: {
+                    long base =
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + 16L * row;
+                    byte[] bytes = new byte[16];
+                    UdfUtils.copyMemory(null, base, bytes, UdfUtils.BYTE_ARRAY_OFFSET, 16);
+
+                    inputObjects[i] = new BigInteger(UdfExecutor.convertByteOrder(bytes));
+                    break;
+                }
+                case DECIMALV2: {
+                    long base =
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) + 16L * row;
+                    byte[] bytes = new byte[16];
+                    UdfUtils.copyMemory(null, base, bytes, UdfUtils.BYTE_ARRAY_OFFSET, 16);
+
+                    BigInteger value = new BigInteger(UdfExecutor.convertByteOrder(bytes));
+                    inputObjects[i] = new BigDecimal(value, 9);
+                    break;
+                }
+                case CHAR:
+                case VARCHAR:
+                case STRING:
+                    long offset = Integer.toUnsignedLong(UdfUtils.UNSAFE.getInt(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputOffsetsPtrs, i))
+                                    + 4L * row));
+                    long numBytes = row == 0 ? offset - 1 : offset - Integer.toUnsignedLong(UdfUtils.UNSAFE.getInt(null,
+                            UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputOffsetsPtrs, i))
+                                    + 4L * (row - 1))) - 1;
+                    long base =
+                            row == 0 ? UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i)) :
+                                    UdfUtils.UNSAFE.getLong(null, UdfUtils.getAddressAtOffset(inputBufferPtrs, i))
+                                            + offset - numBytes - 1;
+                    byte[] bytes = new byte[(int) numBytes];
+                    UdfUtils.copyMemory(null, base, bytes, UdfUtils.BYTE_ARRAY_OFFSET, numBytes);
+                    inputObjects[i] = new String(bytes, StandardCharsets.UTF_8);
+                    break;
+                default:
+                    throw new UdfRuntimeException("Unsupported argument type: " + argTypes[i]);
+            }
+        }
+    }
+
+    private URLClassLoader getClassLoader(String jarPath) throws MalformedURLException {

Review Comment:
   same as method of `UdfExecutor` only keep one, please



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r887636880


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,392 @@
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_CREATE_SIGNATURE = "()Ljava/lang/Object;";
+const char* UDAF_EXECUTOR_DESTORY_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(J)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_DESERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        batch_size_ptr.reset(new int32_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_close_id_);
+        Status s = JniUtil::GetJniExceptionMsg(env);
+        if (!s.ok()) {
+            LOG(WARNING) << "meet some error in destroy: " << s.get_error_msg();
+        }
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+            if (env == nullptr) {
+                return Status::InternalError("Failed to get/create JVM");
+            }
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl_));
+
+            Status ret_code = register_func_id(env);
+            if (!ret_code.ok()) {
+                LOG(WARNING) << "register_func_id has error : " << ret_code.get_error_msg();
+            }
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+                ctor_params.__set_batch_size_ptr((int64_t)batch_size_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    void add(const IColumn** columns, size_t row_num, const DataTypes& argument_types) const {
+        JNIEnv* env = nullptr;

Review Comment:
   we can wrap a function `JNIEnv* get_env(string when)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] Gabriel39 commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
Gabriel39 commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r890782977


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,363 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+const char* UDAF_EXECUTOR_SERIALIZE_LENGTH_SIGNATURE = "()J";
+//Calling Java method about those signture means: "(argument-types)return-type"

Review Comment:
   ```suggestion
   // Calling Java method about those signture means: "(argument-types)return-type"
   ```



##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,363 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+const char* UDAF_EXECUTOR_SERIALIZE_LENGTH_SIGNATURE = "()J";
+//Calling Java method about those signture means: "(argument-types)return-type"
+//https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html

Review Comment:
   ```suggestion
   // https://www.iitk.ac.in/esc101/05Aug/tutorial/native1.1/implementing/method.html
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee merged pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
HappenLee merged PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#issuecomment-1155925474

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r889466854


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return Status::OK();
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        if (rhs.first_init) {
+            return Status::OK();
+        }
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, data);
+        return Status::OK();
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        jlong len = env->GetLongField(executor_obj, serialize_len_id);
+        serialize_data.resize(len);
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_serialize_id, data);
+        write_binary(serialize_data, buf);
+        return Status::OK();
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(to);                     \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = IncreaseReservedBufferSize(increase_buffer_size);               \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return Status::OK();
+    }
+
+    static const int32_t INITIAL_RESERVED_BUFFER_SIZE = 1024;
+    // TODO: we need a heuristic strategy to increase buffer size for variable-size output.
+    static inline int32_t IncreaseReservedBufferSize(int n) {
+        return INITIAL_RESERVED_BUFFER_SIZE << n;
+    }
+
+    Status register_func_id(JNIEnv* env) {
+        auto register_id = [&](const char* func_name, const char* func_sign, jmethodID& func_id) {
+            func_id = env->GetMethodID(executor_cl, func_name, func_sign);
+            Status s = JniUtil::GetJniExceptionMsg(env);
+            if (!s.ok()) {
+                return Status::InternalError(
+                        strings::Substitute("Java-Udaf register_func_id meet error and error is $0",
+                                            s.get_error_msg()));
+            }
+            return s;
+        };
+
+        RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
+        RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+        RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
+        RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
+        RETURN_IF_ERROR(
+                register_id("serialize", UDAF_EXECUTOR_SERIALIZE_SIGNATURE, executor_serialize_id));
+        RETURN_IF_ERROR(
+                register_id("getValue", UDAF_EXECUTOR_RESULT_SIGNATURE, executor_result_id));
+        serialize_len_id = env->GetFieldID(executor_cl, "serializeLength", "J");
+        RETURN_ERROR_IF_EXC(env);
+        return Status::OK();
+    }
+
+public:
+    jclass executor_cl;
+    jobject executor_obj;
+    jmethodID executor_ctor_id;
+
+    jmethodID executor_add_id;
+    jmethodID executor_merge_id;
+    jmethodID executor_serialize_id;
+    jmethodID executor_result_id;
+    jmethodID executor_close_id;
+    jfieldID serialize_len_id;
+
+    std::unique_ptr<int64_t[]> input_values_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_nulls_buffer_ptr;
+    std::unique_ptr<int64_t[]> input_offsets_ptrs;
+    std::unique_ptr<int64_t> output_value_buffer;
+    std::unique_ptr<int64_t> output_null_value;
+    std::unique_ptr<int64_t> output_offsets_ptr;
+    std::unique_ptr<int64_t> output_intermediate_state_ptr;
+
+    bool first_init;
+    int argument_size = 0;
+    std::string serialize_data;
+};
+
+class AggregateJavaUdaf final
+        : public IAggregateFunctionDataHelper<AggregateJavaUdafData, AggregateJavaUdaf> {
+public:
+    AggregateJavaUdaf(const TFunction& fn, const DataTypes& argument_types, const Array& parameters,
+                      const DataTypePtr& return_type)
+            : IAggregateFunctionDataHelper(argument_types, parameters),
+              fn_(fn),
+              return_type_(return_type) {}
+    ~AggregateJavaUdaf() = default;
+
+    static AggregateFunctionPtr create(const TFunction& fn, const DataTypes& argument_types,
+                                       const Array& parameters, const DataTypePtr& return_type) {
+        return std::make_shared<AggregateJavaUdaf>(fn, argument_types, parameters, return_type);
+    }
+
+    void create(AggregateDataPtr __restrict place) const override {
+        new (place) Data(argument_types.size());
+    }
+
+    String get_name() const override { return fn_.name.function_name; }
+
+    DataTypePtr get_return_type() const override { return return_type_; }
+
+    void add(AggregateDataPtr __restrict place, const IColumn** columns, size_t row_num,
+             Arena*) const override {
+        Status status = Status::OK();
+        RETURN_IF_STATUS_ERROR(status, data(place).init_udaf(fn_));

Review Comment:
   why not do init in `create` function and delete the `first_init`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r887644383


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,392 @@
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_CREATE_SIGNATURE = "()Ljava/lang/Object;";
+const char* UDAF_EXECUTOR_DESTORY_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(J)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_DESERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        batch_size_ptr.reset(new int32_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_close_id_);
+        Status s = JniUtil::GetJniExceptionMsg(env);
+        if (!s.ok()) {
+            LOG(WARNING) << "meet some error in destroy: " << s.get_error_msg();
+        }
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+            if (env == nullptr) {
+                return Status::InternalError("Failed to get/create JVM");
+            }
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl_));
+
+            Status ret_code = register_func_id(env);
+            if (!ret_code.ok()) {
+                LOG(WARNING) << "register_func_id has error : " << ret_code.get_error_msg();
+            }
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+                ctor_params.__set_batch_size_ptr((int64_t)batch_size_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    void add(const IColumn** columns, size_t row_num, const DataTypes& argument_types) const {
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when add: " << ret_code.get_error_msg();
+        }
+        *batch_size_ptr = columns[0]->size();
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                LOG(WARNING) << "Java UDAF doesn't support type: "
+                             << argument_types[arg_idx]->get_name() << " now !";
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_add_id_, row_num);
+    }
+
+    void merge(const AggregateJavaUdafData& rhs) {
+        if (rhs.first_init) {
+            return;
+        }
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when merge: " << ret_code.get_error_msg();
+        }
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();

Review Comment:
   Should `long` be used here? `long` has different meaning in `c++` and `java`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] HappenLee commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
HappenLee commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r889507979


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return Status::OK();
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        if (rhs.first_init) {

Review Comment:
   here is werid of `first_init`, rethink the logic



##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";

Review Comment:
   what the SIGNATURE means ? add comment or url of jdk doc to let ohter know how to develop.



##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return Status::OK();
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        if (rhs.first_init) {
+            return Status::OK();
+        }
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, data);
+        return Status::OK();
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        jlong len = env->GetLongField(executor_obj, serialize_len_id);
+        serialize_data.resize(len);
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_serialize_id, data);
+        write_binary(serialize_data, buf);
+        return Status::OK();
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(to);                     \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = IncreaseReservedBufferSize(increase_buffer_size);               \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return Status::OK();
+    }
+
+    static const int32_t INITIAL_RESERVED_BUFFER_SIZE = 1024;
+    // TODO: we need a heuristic strategy to increase buffer size for variable-size output.
+    static inline int32_t IncreaseReservedBufferSize(int n) {
+        return INITIAL_RESERVED_BUFFER_SIZE << n;
+    }
+
+    Status register_func_id(JNIEnv* env) {
+        auto register_id = [&](const char* func_name, const char* func_sign, jmethodID& func_id) {
+            func_id = env->GetMethodID(executor_cl, func_name, func_sign);
+            Status s = JniUtil::GetJniExceptionMsg(env);
+            if (!s.ok()) {
+                return Status::InternalError(
+                        strings::Substitute("Java-Udaf register_func_id meet error and error is $0",
+                                            s.get_error_msg()));
+            }
+            return s;
+        };
+
+        RETURN_IF_ERROR(register_id("<init>", UDAF_EXECUTOR_CTOR_SIGNATURE, executor_ctor_id));
+        RETURN_IF_ERROR(register_id("add", UDAF_EXECUTOR_ADD_SIGNATURE, executor_add_id));
+        RETURN_IF_ERROR(register_id("close", UDAF_EXECUTOR_CLOSE_SIGNATURE, executor_close_id));
+        RETURN_IF_ERROR(register_id("merge", UDAF_EXECUTOR_MERGE_SIGNATURE, executor_merge_id));
+        RETURN_IF_ERROR(
+                register_id("serialize", UDAF_EXECUTOR_SERIALIZE_SIGNATURE, executor_serialize_id));
+        RETURN_IF_ERROR(
+                register_id("getValue", UDAF_EXECUTOR_RESULT_SIGNATURE, executor_result_id));
+        serialize_len_id = env->GetFieldID(executor_cl, "serializeLength", "J");
+        RETURN_ERROR_IF_EXC(env);
+        return Status::OK();
+    }
+
+public:
+    jclass executor_cl;
+    jobject executor_obj;
+    jmethodID executor_ctor_id;
+
+    jmethodID executor_add_id;
+    jmethodID executor_merge_id;
+    jmethodID executor_serialize_id;
+    jmethodID executor_result_id;
+    jmethodID executor_close_id;
+    jfieldID serialize_len_id;
+
+    std::unique_ptr<int64_t[]> input_values_buffer_ptr;

Review Comment:
   why be `public`?



##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return Status::OK();
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        if (rhs.first_init) {
+            return Status::OK();
+        }
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, data);
+        return Status::OK();
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        jlong len = env->GetLongField(executor_obj, serialize_len_id);
+        serialize_data.resize(len);
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_serialize_id, data);
+        write_binary(serialize_data, buf);
+        return Status::OK();
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();

Review Comment:
   rethink the `insert_default()` of column string



##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(JJ)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id);
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env),
+                                           "Java-Udaf init_udaf function");
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl));
+            RETURN_NOT_OK_STATUS_WITH_WARN(register_func_id(env),
+                                           "Java-Udaf register_func_id function");
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl, executor_ctor_id, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    Status add(const IColumn** columns, size_t row_num_start, size_t row_num_end,
+               const DataTypes& argument_types) {
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf add function");
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                return Status::InvalidArgument(
+                        strings::Substitute("Java UDAF doesn't support type is $0 now !",
+                                            argument_types[arg_idx]->get_name()));
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_add_id, row_num_start,
+                                      row_num_end);
+        return Status::OK();
+    }
+
+    Status merge(const AggregateJavaUdafData& rhs) {
+        if (rhs.first_init) {
+            return Status::OK();
+        }
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf merge function");
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_merge_id, data);
+        return Status::OK();
+    }
+
+    Status write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf write function");
+        jlong len = env->GetLongField(executor_obj, serialize_len_id);
+        serialize_data.resize(len);
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_serialize_id, data);
+        write_binary(serialize_data, buf);
+        return Status::OK();
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        read_binary(serialize_data, buf);
+    }
+
+    Status get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        RETURN_NOT_OK_STATUS_WITH_WARN(JniUtil::GetJNIEnv(&env), "Java-Udaf get value function");
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                        \
+    if (data_col.is_column_string()) {                                                            \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(to);                     \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars());      \
+        ColumnString::Offsets& offsets =                                                          \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                       \
+        int increase_buffer_size = 0;                                                             \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                           \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                          \
+        *output_intermediate_state_ptr = chars.size();                                            \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl,                \
+                                                        executor_result_id, to.size() - 1);       \
+        while (res != JNI_TRUE) {                                                                 \
+            int32_t buffer_size = IncreaseReservedBufferSize(increase_buffer_size);               \
+            increase_buffer_size++;                                                               \
+            chars.reserve(chars.size() + buffer_size);                                            \
+            chars.resize(chars.size() + buffer_size);                                             \
+            *output_intermediate_state_ptr = chars.size();                                        \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id, \
+                                                   to.size() - 1);                                \
+        }                                                                                         \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                           \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);           \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,           \
+                                         to.size() - 1);                                          \
+    } else {                                                                                      \
+        return Status::InvalidArgument(strings::Substitute(                                       \
+                "Java UDAF doesn't support return type is $0 now !", result_type->get_name()));   \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;
+        } else {
+            *output_null_value = -1;
+            *output_value_buffer = reinterpret_cast<int64_t>(to.get_raw_data().data);
+            auto& data_col = to;
+            EVALUATE_JAVA_UDAF;
+            env->CallNonvirtualBooleanMethod(executor_obj, executor_cl, executor_result_id,
+                                             to.size() - 1);
+        }
+        return Status::OK();
+    }
+
+    static const int32_t INITIAL_RESERVED_BUFFER_SIZE = 1024;
+    // TODO: we need a heuristic strategy to increase buffer size for variable-size output.
+    static inline int32_t IncreaseReservedBufferSize(int n) {
+        return INITIAL_RESERVED_BUFFER_SIZE << n;
+    }
+
+    Status register_func_id(JNIEnv* env) {

Review Comment:
   only call in class, maybe should be `private` and `_register_func_id`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
morningman commented on PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#issuecomment-1144580502

   Hi @zhangstar333 , could you explain more about what does this PR do?
   Because we already supported part of JAVA udf feature, so you need to explain what's new in your PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] BiteTheDDDDt commented on a diff in pull request #9930: [Vectorized][UDF] support java-udaf

Posted by GitBox <gi...@apache.org>.
BiteTheDDDDt commented on code in PR #9930:
URL: https://github.com/apache/incubator-doris/pull/9930#discussion_r887649309


##########
be/src/vec/aggregate_functions/aggregate_function_java_udaf.h:
##########
@@ -0,0 +1,392 @@
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#ifdef LIBJVM
+
+#include <jni.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <memory>
+
+#include "common/status.h"
+#include "gen_cpp/Exprs_types.h"
+#include "runtime/user_function_cache.h"
+#include "util/jni-util.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/exception.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/column_numbers.h"
+#include "vec/core/field.h"
+#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/io/io_helper.h"
+
+namespace doris::vectorized {
+
+const char* UDAF_EXECUTOR_CLASS = "org/apache/doris/udf/UdafExecutor";
+const char* UDAF_EXECUTOR_CTOR_SIGNATURE = "([B)V";
+const char* UDAF_EXECUTOR_CLOSE_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_CREATE_SIGNATURE = "()Ljava/lang/Object;";
+const char* UDAF_EXECUTOR_DESTORY_SIGNATURE = "()V";
+const char* UDAF_EXECUTOR_ADD_SIGNATURE = "(J)V";
+const char* UDAF_EXECUTOR_MERGE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_SERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_DESERIALIZE_SIGNATURE = "(Ljava/lang/Object;)V";
+const char* UDAF_EXECUTOR_RESULT_SIGNATURE = "(J)Z";
+
+struct AggregateJavaUdafData {
+public:
+    AggregateJavaUdafData() = default;
+    AggregateJavaUdafData(int64_t num_args) {
+        argument_size = num_args;
+        first_init = true;
+        input_values_buffer_ptr.reset(new int64_t[num_args]);
+        input_nulls_buffer_ptr.reset(new int64_t[num_args]);
+        input_offsets_ptrs.reset(new int64_t[num_args]);
+        output_value_buffer.reset(new int64_t);
+        output_null_value.reset(new int64_t);
+        batch_size_ptr.reset(new int32_t);
+        output_offsets_ptr.reset(new int64_t);
+        output_intermediate_state_ptr.reset(new int64_t);
+    }
+
+    ~AggregateJavaUdafData() {
+        JNIEnv* env;
+        Status status;
+        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_close_id_);
+        Status s = JniUtil::GetJniExceptionMsg(env);
+        if (!s.ok()) {
+            LOG(WARNING) << "meet some error in destroy: " << s.get_error_msg();
+        }
+        env->DeleteGlobalRef(executor_obj);
+    }
+
+    Status init_udaf(const TFunction& fn) {
+        if (first_init) {
+            JNIEnv* env = nullptr;
+            RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+            if (env == nullptr) {
+                return Status::InternalError("Failed to get/create JVM");
+            }
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, UDAF_EXECUTOR_CLASS, &executor_cl_));
+
+            Status ret_code = register_func_id(env);
+            if (!ret_code.ok()) {
+                LOG(WARNING) << "register_func_id has error : " << ret_code.get_error_msg();
+            }
+
+            // Add a scoped cleanup jni reference object. This cleans up local refs made below.
+            JniLocalFrame jni_frame;
+            {
+                std::string local_location;
+                auto function_cache = UserFunctionCache::instance();
+                RETURN_IF_ERROR(function_cache->get_jarpath(fn.id, fn.hdfs_location, fn.checksum,
+                                                            &local_location));
+                TJavaUdfExecutorCtorParams ctor_params;
+                ctor_params.__set_fn(fn);
+                ctor_params.__set_location(local_location);
+                ctor_params.__set_input_offsets_ptrs((int64_t)input_offsets_ptrs.get());
+                ctor_params.__set_input_buffer_ptrs((int64_t)input_values_buffer_ptr.get());
+                ctor_params.__set_input_nulls_ptrs((int64_t)input_nulls_buffer_ptr.get());
+                ctor_params.__set_output_buffer_ptr((int64_t)output_value_buffer.get());
+
+                ctor_params.__set_output_null_ptr((int64_t)output_null_value.get());
+                ctor_params.__set_output_offsets_ptr((int64_t)output_offsets_ptr.get());
+                ctor_params.__set_output_intermediate_state_ptr(
+                        (int64_t)output_intermediate_state_ptr.get());
+                ctor_params.__set_batch_size_ptr((int64_t)batch_size_ptr.get());
+
+                jbyteArray ctor_params_bytes;
+
+                // Pushed frame will be popped when jni_frame goes out-of-scope.
+                RETURN_IF_ERROR(jni_frame.push(env));
+                RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes));
+                executor_obj = env->NewObject(executor_cl_, executor_ctor_id_, ctor_params_bytes);
+            }
+            RETURN_ERROR_IF_EXC(env);
+            RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, executor_obj, &executor_obj));
+            first_init = false;
+        }
+        return Status::OK();
+    }
+
+    void add(const IColumn** columns, size_t row_num, const DataTypes& argument_types) const {
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when add: " << ret_code.get_error_msg();
+        }
+        *batch_size_ptr = columns[0]->size();
+        for (int arg_idx = 0; arg_idx < argument_size; ++arg_idx) {
+            auto data_col = columns[arg_idx];
+            if (auto* nullable = check_and_get_column<const ColumnNullable>(*columns[arg_idx])) {
+                data_col = nullable->get_nested_column_ptr();
+                auto null_col = check_and_get_column<ColumnVector<UInt8>>(
+                        nullable->get_null_map_column_ptr());
+                input_nulls_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(null_col->get_data().data());
+            } else {
+                input_nulls_buffer_ptr.get()[arg_idx] = -1;
+            }
+            if (data_col->is_column_string()) {
+                const ColumnString* str_col = check_and_get_column<ColumnString>(data_col);
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_chars().data());
+                input_offsets_ptrs.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(str_col->get_offsets().data());
+            } else if (data_col->is_numeric() || data_col->is_column_decimal()) {
+                input_values_buffer_ptr.get()[arg_idx] =
+                        reinterpret_cast<int64_t>(data_col->get_raw_data().data);
+            } else {
+                LOG(WARNING) << "Java UDAF doesn't support type: "
+                             << argument_types[arg_idx]->get_name() << " now !";
+            }
+        }
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_add_id_, row_num);
+    }
+
+    void merge(const AggregateJavaUdafData& rhs) {
+        if (rhs.first_init) {
+            return;
+        }
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when merge: " << ret_code.get_error_msg();
+        }
+        serialize_data = rhs.serialize_data;
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_merge_id_, data);
+    }
+
+    void write(BufferWritable& buf) {
+        write_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when write: " << ret_code.get_error_msg();
+        }
+        jlong len = env->GetLongField(executor_obj, serialize_len_id);
+        serialize_data.resize(len);
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_serialize_id_, data);
+        write_binary(serialize_data, buf);
+    }
+
+    void read(BufferReadable& buf) {
+        read_binary(first_init, buf);
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when read: " << ret_code.get_error_msg();
+        }
+        read_binary(serialize_data, buf);
+        long len = serialize_data.length();
+        jobject data = env->NewDirectByteBuffer(serialize_data.data(), len);
+        env->CallNonvirtualVoidMethod(executor_obj, executor_cl_, executor_deserialize_id_, data);
+    }
+
+    void get(IColumn& to, const DataTypePtr& result_type) const {
+        to.insert_default();
+        JNIEnv* env = nullptr;
+        Status ret_code = JniUtil::GetJNIEnv(&env);
+        if (env == nullptr || !ret_code.ok()) {
+            LOG(WARNING) << "Java-Udaf get error when get value: " << ret_code.get_error_msg();
+        }
+        if (result_type->is_nullable()) {
+            auto& nullable = assert_cast<ColumnNullable&>(to);
+            *output_null_value =
+                    reinterpret_cast<int64_t>(nullable.get_null_map_column().get_raw_data().data);
+            auto& data_col = nullable.get_nested_column();
+
+#ifndef EVALUATE_JAVA_UDAF
+#define EVALUATE_JAVA_UDAF                                                                   \
+    if (data_col.is_column_string()) {                                                       \
+        const ColumnString* str_col = check_and_get_column<ColumnString>(to);                \
+        ColumnString::Chars& chars = const_cast<ColumnString::Chars&>(str_col->get_chars()); \
+        ColumnString::Offsets& offsets =                                                     \
+                const_cast<ColumnString::Offsets&>(str_col->get_offsets());                  \
+        int increase_buffer_size = 0;                                                        \
+        int32_t buffer_size = IncreaseReservedBufferSize(increase_buffer_size);              \
+        chars.reserve(chars.size() + buffer_size);                                           \
+        chars.resize(chars.size() + buffer_size);                                            \
+        *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                      \
+        *output_offsets_ptr = reinterpret_cast<int64_t>(offsets.data());                     \
+        *output_intermediate_state_ptr = buffer_size;                                        \
+        jboolean res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl_,          \
+                                                        executor_result_id_, to.size() - 1); \
+        while (res != JNI_TRUE) {                                                            \
+            increase_buffer_size++;                                                          \
+            int32_t buffer_size = IncreaseReservedBufferSize(increase_buffer_size);          \
+            chars.resize(chars.size() + buffer_size);                                        \
+            *output_value_buffer = reinterpret_cast<int64_t>(chars.data());                  \
+            *output_intermediate_state_ptr = buffer_size;                                    \
+            res = env->CallNonvirtualBooleanMethod(executor_obj, executor_cl_,               \
+                                                   executor_result_id_, to.size() - 1);      \
+        }                                                                                    \
+    } else if (data_col.is_numeric() || data_col.is_column_decimal()) {                      \
+        *output_value_buffer = reinterpret_cast<int64_t>(data_col.get_raw_data().data);      \
+        env->CallNonvirtualBooleanMethod(executor_obj, executor_cl_, executor_result_id_,    \
+                                         to.size() - 1);                                     \
+    } else {                                                                                 \
+        LOG(WARNING) << "Java UDAF doesn't support return type: " << result_type->get_name() \
+                     << "$0 now !";                                                          \
+    }
+#endif
+            EVALUATE_JAVA_UDAF;

Review Comment:
   Do you just want a lambda function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org