You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/12/29 02:55:54 UTC

[GitHub] [incubator-doris] yangzhg opened a new pull request #7519: …

yangzhg opened a new pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519


   ## Proposed changes
   
   Support UDF is realized through GRPC protocol. This brings several benefits:
   
   1.  The udf implementation language is not limited to c++, users can use any familiar language to implement udf
   2. UDF is decoupled from Doris, udf will not be doris coredump together, udf computing resources are separated from doris, and doris services are not affected
   
   But RPC's UDF has a fixed overhead, so its performance is much slower than C++ UDF, especially when the amount of data is large.
   
   Create function like
   
   ```
   CREATE FUNCTION rpc_add(INT, INT) RETURNS INT PROPERTIES (
     "SYMBOL"="add_int",
     "OBJECT_FILE"="127.0.0.1:9999",
     "MD5"=""
   ```
   
   function service need to implements `check_fn` and `fn_call` methods
   
   
   ## Types of changes
   
   What types of changes does your code introduce to Doris?
   _Put an `x` in the boxes that apply_
   
   - [ ] Bugfix (non-breaking change which fixes an issue)
   - [x] New feature (non-breaking change which adds functionality)
   - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
   - [ ] Documentation Update (if none of the other choices apply)
   - [ ] Code refactor (Modify the code structure, format the code, etc...)
   - [ ] Optimization. Including functional usability improvements and performance improvements.
   - [ ] Dependency. Such as changes related to third-party components.
   - [ ] Other.
   
   ## Checklist
   
   _Put an `x` in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your code._
   
   - [ ] I have created an issue on (Fix #ISSUE) and described the bug/feature there in detail
   - [ ] Compiling and unit tests pass locally with my changes
   - [ ] I have added tests that prove my fix is effective or that my feature works
   - [ ] If these changes need document changes, I have updated the document
   - [ ] Any dependent changes have been merged
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg merged pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
yangzhg merged pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#discussion_r784459211



##########
File path: be/src/exprs/rpc_fn_call.cpp
##########
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/rpc_fn_call.h"
+
+#include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
+#include "fmt/format.h"
+#include "gen_cpp/function_service.pb.h"
+#include "runtime/runtime_state.h"
+#include "runtime/user_function_cache.h"
+#include "service/brpc.h"
+#include "util/brpc_client_cache.h"
+#include "util/defer_op.h"
+
+namespace doris {
+
+RPCFnCall::RPCFnCall(const TExprNode& node) : Expr(node), _fn_context_index(-1) {
+    DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::RPC);
+}
+
+Status RPCFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, ExprContext* context) {
+    RETURN_IF_ERROR(Expr::prepare(state, desc, context));
+    DCHECK(!_fn.scalar_fn.symbol.empty());
+
+    FunctionContext::TypeDesc return_type = AnyValUtil::column_type_to_type_desc(_type);
+    std::vector<FunctionContext::TypeDesc> arg_types;
+    bool char_arg = false;
+    for (int i = 0; i < _children.size(); ++i) {
+        arg_types.push_back(AnyValUtil::column_type_to_type_desc(_children[i]->type()));
+        char_arg = char_arg || (_children[i]->type().type == TYPE_CHAR);
+    }
+    _fn_context_index = context->register_func(state, return_type, arg_types, 0);
+
+    // _fn.scalar_fn.symbol
+    _rpc_function_symbol = _fn.scalar_fn.symbol;
+
+    _client = state->exec_env()->brpc_function_client_cache()->get_client(_fn.hdfs_location);
+
+    if (_client == nullptr) {
+        return Status::InternalError("rpc env init error");
+    }
+    return Status::OK();
+}
+
+Status RPCFnCall::open(RuntimeState* state, ExprContext* ctx,
+                       FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(Expr::open(state, ctx, scope));
+    return Status::OK();
+}
+
+void RPCFnCall::close(RuntimeState* state, ExprContext* context,
+                      FunctionContext::FunctionStateScope scope) {
+    Expr::close(state, context, scope);
+}
+
+Status RPCFnCall::_eval_children(ExprContext* context, TupleRow* row,
+                                 PFunctionCallResponse* response) {
+    PFunctionCallRequest request;
+    request.set_function_name(_rpc_function_symbol);
+    PArgs* args = request.mutable_args();
+    for (int i = 0; i < _children.size(); ++i) {
+        PParameter* param = args->add_values();
+        void* src_slot = context->get_value(_children[i], row);
+        PParameterType* ptype = param->mutable_type();
+        if (src_slot == nullptr) {
+            ptype->set_type(PParameterType::NULL_TYPE);
+            continue;
+        }
+        switch (_children[i]->type().type) {

Review comment:
       Yes! I will  add later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#discussion_r784513130



##########
File path: be/src/exec/tablet_sink.cpp
##########
@@ -146,10 +147,8 @@ void NodeChannel::_cancel_with_msg(const std::string& msg) {
 Status NodeChannel::open_wait() {
     _open_closure->join();
     if (_open_closure->cntl.Failed()) {
-        if (!ExecEnv::GetInstance()->brpc_stub_cache()->available(_stub, _node_info.host,
-                                                                  _node_info.brpc_port)) {
-            ExecEnv::GetInstance()->brpc_stub_cache()->erase(_open_closure->cntl.remote_side());
-        }
+        ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(

Review comment:
       it is not cost so much when create a new stub




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] stalary commented on a change in pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
stalary commented on a change in pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#discussion_r780671738



##########
File path: be/src/exec/tablet_sink.cpp
##########
@@ -146,10 +147,8 @@ void NodeChannel::_cancel_with_msg(const std::string& msg) {
 Status NodeChannel::open_wait() {
     _open_closure->join();
     if (_open_closure->cntl.Failed()) {
-        if (!ExecEnv::GetInstance()->brpc_stub_cache()->available(_stub, _node_info.host,
-                                                                  _node_info.brpc_port)) {
-            ExecEnv::GetInstance()->brpc_stub_cache()->erase(_open_closure->cntl.remote_side());
-        }
+        ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(

Review comment:
       Why is it no longer necessary to judge unavailabe

##########
File path: be/src/exprs/rpc_fn_call.cpp
##########
@@ -0,0 +1,329 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/rpc_fn_call.h"
+
+#include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
+#include "fmt/format.h"
+#include "gen_cpp/function_service.pb.h"
+#include "runtime/runtime_state.h"
+#include "runtime/user_function_cache.h"
+#include "service/brpc.h"
+#include "util/brpc_client_cache.h"
+#include "util/defer_op.h"
+
+namespace doris {
+
+RPCFnCall::RPCFnCall(const TExprNode& node) : Expr(node), _fn_context_index(-1) {
+    DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::RPC);
+}
+
+Status RPCFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, ExprContext* context) {
+    RETURN_IF_ERROR(Expr::prepare(state, desc, context));
+    DCHECK(!_fn.scalar_fn.symbol.empty());
+
+    FunctionContext::TypeDesc return_type = AnyValUtil::column_type_to_type_desc(_type);
+    std::vector<FunctionContext::TypeDesc> arg_types;
+    bool char_arg = false;
+    for (int i = 0; i < _children.size(); ++i) {
+        arg_types.push_back(AnyValUtil::column_type_to_type_desc(_children[i]->type()));
+        char_arg = char_arg || (_children[i]->type().type == TYPE_CHAR);
+    }
+    _fn_context_index = context->register_func(state, return_type, arg_types, 0);
+
+    // _fn.scalar_fn.symbol
+    _rpc_function_symbol = _fn.scalar_fn.symbol;
+
+    _client = state->exec_env()->brpc_function_client_cache()->get_client(_fn.hdfs_location);
+
+    if (_client == nullptr) {
+        return Status::InternalError("rpc env init error");
+    }
+    return Status::OK();
+}
+
+Status RPCFnCall::open(RuntimeState* state, ExprContext* ctx,
+                       FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(Expr::open(state, ctx, scope));
+    return Status::OK();
+}
+
+void RPCFnCall::close(RuntimeState* state, ExprContext* context,
+                      FunctionContext::FunctionStateScope scope) {
+    Expr::close(state, context, scope);
+}
+
+Status RPCFnCall::_eval_children(ExprContext* context, TupleRow* row,
+                                 PFunctionCallResponse* response) {
+    PFunctionCallRequest request;
+    request.set_function_name(_rpc_function_symbol);
+    PArgs* args = request.mutable_args();
+    for (int i = 0; i < _children.size(); ++i) {
+        PParameter* param = args->add_values();
+        void* src_slot = context->get_value(_children[i], row);
+        PParameterType* ptype = param->mutable_type();
+        if (src_slot == nullptr) {
+            ptype->set_type(PParameterType::NULL_TYPE);
+            continue;
+        }
+        switch (_children[i]->type().type) {

Review comment:
       Whether need to support `TYPE_STRING`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] stalary commented on pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
stalary commented on pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#issuecomment-1008002567


   Whether RPC addresses support load balance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yangzhg commented on a change in pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
yangzhg commented on a change in pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#discussion_r800267157



##########
File path: gensrc/proto/types.proto
##########
@@ -63,3 +67,150 @@ message PUniqueId {
     required int64 lo = 2;
 };
 
+message PGenericType {

Review comment:
       That's a good idea, but I think the datatype define in `data.proto` can not describe complex types very well, `PGenericType` + `PList` + `PMap` + `PField` +  `PStruct` + `PDecimal` may be better. And BTW,  datatype should not  to complex, it should be used more easier between languages 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#issuecomment-1031153214






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] stalary commented on pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
stalary commented on pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#issuecomment-1005848456


   Let me see


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#discussion_r800060844



##########
File path: be/src/exprs/rpc_fn_call.cpp
##########
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/rpc_fn_call.h"
+
+#include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
+#include "fmt/format.h"
+#include "gen_cpp/function_service.pb.h"
+#include "runtime/runtime_state.h"
+#include "runtime/user_function_cache.h"
+#include "service/brpc.h"
+#include "util/brpc_client_cache.h"
+
+namespace doris {
+
+RPCFnCall::RPCFnCall(const TExprNode& node) : Expr(node), _fn_context_index(-1) {
+    DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::RPC);
+}
+
+Status RPCFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, ExprContext* context) {
+    RETURN_IF_ERROR(Expr::prepare(state, desc, context));
+    DCHECK(!_fn.scalar_fn.symbol.empty());
+
+    FunctionContext::TypeDesc return_type = AnyValUtil::column_type_to_type_desc(_type);
+    std::vector<FunctionContext::TypeDesc> arg_types;
+    bool char_arg = false;
+    for (int i = 0; i < _children.size(); ++i) {
+        arg_types.push_back(AnyValUtil::column_type_to_type_desc(_children[i]->type()));
+        char_arg = char_arg || (_children[i]->type().type == TYPE_CHAR);
+    }
+    _fn_context_index = context->register_func(state, return_type, arg_types, 0);
+
+    // _fn.scalar_fn.symbol
+    _rpc_function_symbol = _fn.scalar_fn.symbol;
+
+    _client = state->exec_env()->brpc_function_client_cache()->get_client(_fn.hdfs_location);
+
+    if (_client == nullptr) {
+        return Status::InternalError(
+                fmt::format("rpc env init error: {}/{}", _fn.hdfs_location, _rpc_function_symbol));

Review comment:
       `hdfs_location`?

##########
File path: be/src/exprs/rpc_fn_call.cpp
##########
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/rpc_fn_call.h"
+
+#include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
+#include "fmt/format.h"
+#include "gen_cpp/function_service.pb.h"
+#include "runtime/runtime_state.h"
+#include "runtime/user_function_cache.h"
+#include "service/brpc.h"
+#include "util/brpc_client_cache.h"
+
+namespace doris {
+
+RPCFnCall::RPCFnCall(const TExprNode& node) : Expr(node), _fn_context_index(-1) {
+    DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::RPC);
+}
+
+Status RPCFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, ExprContext* context) {
+    RETURN_IF_ERROR(Expr::prepare(state, desc, context));
+    DCHECK(!_fn.scalar_fn.symbol.empty());
+
+    FunctionContext::TypeDesc return_type = AnyValUtil::column_type_to_type_desc(_type);
+    std::vector<FunctionContext::TypeDesc> arg_types;
+    bool char_arg = false;
+    for (int i = 0; i < _children.size(); ++i) {
+        arg_types.push_back(AnyValUtil::column_type_to_type_desc(_children[i]->type()));
+        char_arg = char_arg || (_children[i]->type().type == TYPE_CHAR);
+    }
+    _fn_context_index = context->register_func(state, return_type, arg_types, 0);
+
+    // _fn.scalar_fn.symbol
+    _rpc_function_symbol = _fn.scalar_fn.symbol;
+
+    _client = state->exec_env()->brpc_function_client_cache()->get_client(_fn.hdfs_location);
+
+    if (_client == nullptr) {
+        return Status::InternalError(
+                fmt::format("rpc env init error: {}/{}", _fn.hdfs_location, _rpc_function_symbol));
+    }
+    return Status::OK();
+}
+
+Status RPCFnCall::open(RuntimeState* state, ExprContext* ctx,
+                       FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(Expr::open(state, ctx, scope));
+    return Status::OK();
+}
+
+void RPCFnCall::close(RuntimeState* state, ExprContext* context,
+                      FunctionContext::FunctionStateScope scope) {
+    Expr::close(state, context, scope);
+}
+
+Status RPCFnCall::_eval_children(ExprContext* context, TupleRow* row,
+                                 PFunctionCallResponse* response) {
+    PFunctionCallRequest request;
+    request.set_function_name(_rpc_function_symbol);
+    int64_t name_hash = 0;
+    murmur_hash3_x64_64(_rpc_function_symbol.c_str(), _rpc_function_symbol.size(), 21217891,
+                        &name_hash);
+    request.set_name_hash(name_hash);
+    for (int i = 0; i < _children.size(); ++i) {
+        PValues* arg = request.add_args();
+        void* src_slot = context->get_value(_children[i], row);
+        PGenericType* ptype = arg->mutable_type();
+        if (src_slot == nullptr) {
+            arg->set_has_null(true);
+            arg->add_null_map(true);
+        } else {
+            arg->set_has_null(false);
+        }
+        switch (_children[i]->type().type) {
+        case TYPE_BOOLEAN: {
+            ptype->set_id(PGenericType::BOOLEAN);
+            arg->add_bool_value(*(bool*)src_slot);
+            break;
+        }
+        case TYPE_TINYINT: {
+            ptype->set_id(PGenericType::INT8);
+            arg->add_int32_value(*(int8_t*)src_slot);
+            break;
+        }
+        case TYPE_SMALLINT: {
+            ptype->set_id(PGenericType::INT16);
+            arg->add_int32_value(*(int16_t*)src_slot);
+            break;
+        }
+        case TYPE_INT: {
+            ptype->set_id(PGenericType::INT32);
+            arg->add_int32_value(*(int*)src_slot);
+            break;
+        }
+        case TYPE_BIGINT: {
+            ptype->set_id(PGenericType::INT64);
+            arg->add_int64_value(*(int64_t*)src_slot);
+            break;
+        }
+        case TYPE_LARGEINT: {
+            ptype->set_id(PGenericType::INT128);
+            char buffer[sizeof(__int128)];
+            memcpy(buffer, src_slot, sizeof(__int128));
+            arg->add_bytes_value(buffer, sizeof(__int128));
+            break;
+        }
+        case TYPE_DOUBLE: {
+            ptype->set_id(PGenericType::DOUBLE);
+            arg->add_double_value(*(double*)src_slot);
+            break;
+        }
+        case TYPE_FLOAT: {
+            ptype->set_id(PGenericType::FLOAT);
+            arg->add_float_value(*(float*)src_slot);
+            break;
+        }
+        case TYPE_VARCHAR:
+        case TYPE_STRING:
+        case TYPE_CHAR: {
+            ptype->set_id(PGenericType::STRING);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_HLL: {
+            ptype->set_id(PGenericType::HLL);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_OBJECT: {
+            ptype->set_id(PGenericType::BITMAP);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_DECIMALV2: {
+            ptype->set_id(PGenericType::DECIMAL128);
+            ptype->mutable_decimal_type()->set_precision(_children[i]->type().precision);
+            ptype->mutable_decimal_type()->set_scale(_children[i]->type().scale);
+            char buffer[sizeof(__int128)];
+            memcpy(buffer, src_slot, sizeof(__int128));
+            arg->add_bytes_value(buffer, sizeof(__int128));
+            break;
+        }
+        case TYPE_DATE: {
+            ptype->set_id(PGenericType::DATE);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_day(time_val->day());
+            date_time->set_month(time_val->month());
+            date_time->set_year(time_val->year());
+            break;
+        }
+        case TYPE_DATETIME: {
+            ptype->set_id(PGenericType::DATETIME);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_day(time_val->day());
+            date_time->set_month(time_val->month());
+            date_time->set_year(time_val->year());
+            date_time->set_hour(time_val->hour());
+            date_time->set_minute(time_val->minute());
+            date_time->set_second(time_val->second());
+            date_time->set_microsecond(time_val->microsecond());
+            break;
+        }
+        case TYPE_TIME: {
+            ptype->set_id(PGenericType::DATETIME);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_hour(time_val->hour());
+            date_time->set_minute(time_val->minute());
+            date_time->set_second(time_val->second());
+            date_time->set_microsecond(time_val->microsecond());
+            break;
+        }
+        default: {
+            FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+            fn_ctx->set_error(
+                    fmt::format("data time not supported: {}", _children[i]->type().type).c_str());
+            break;
+        }
+        }
+    }
+
+    brpc::Controller cntl;
+    _client->fn_call(&cntl, &request, response, nullptr);
+    if (cntl.Failed()) {
+        FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+        fn_ctx->set_error(cntl.ErrorText().c_str());
+        return Status::InternalError(fmt::format("call to rpc function {} failed: {}",
+                                                 _rpc_function_symbol, cntl.ErrorText())
+                                             .c_str());
+    }
+    if (response->status().status_code() != 0) {
+        FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+        fn_ctx->set_error(response->status().DebugString().c_str());
+        return Status::InternalError(fmt::format("call to rpc function {} failed: {}",
+                                                 _rpc_function_symbol,
+                                                 response->status().DebugString()));
+    }
+    return Status::OK();
+}
+
+template <typename T>
+T RPCFnCall::interpret_eval(ExprContext* context, TupleRow* row) {
+    T res_val;
+    PFunctionCallResponse response;
+    Status st = _eval_children(context, row, &response);
+    if (!st.ok() || response.status().status_code() != 0 ||
+        (response.result().has_null() && response.result().null_map(0))) {
+        res_val.is_null = true;

Review comment:
       Is it ok to return null if RPC failed?

##########
File path: gensrc/proto/types.proto
##########
@@ -63,3 +67,150 @@ message PUniqueId {
     required int64 lo = 2;
 };
 
+message PGenericType {
+    enum TypeId {
+        UINT8 = 0;
+        UINT16 = 1;
+        UINT32 = 2;
+        UINT64 = 3;
+        UINT128 = 4;
+        UINT256 = 5;
+        INT8 = 6;
+        INT16 = 7;
+        INT32 = 8;
+        INT64 = 9;
+        INT128 = 10;
+        INT256 = 11;
+        FLOAT = 12;
+        DOUBLE = 13;
+        BOOLEAN = 14;
+        DATE = 15;
+        DATETIME = 16;
+        HLL = 17;
+        BITMAP = 18;
+        LIST = 19;
+        MAP = 20;
+        STRUCT =21;
+        STRING = 22;
+        DECIMAL32 = 23;
+        DECIMAL64 = 24;
+        DECIMAL128 = 25;
+        BYTES = 26;
+        NOTHING = 27;
+        UNKNOWN = 999;
+    }
+    required TypeId id = 2;
+    optional PList list_type = 11;
+    optional PMap map_type = 12;
+    optional PStruct struct_type = 13;
+    optional PDecimal decimal_type = 14;
+}
+
+message PList {
+  required PGenericType element_type = 1;
+}
+
+message PMap {
+  required PGenericType key_type = 1;
+  required PGenericType value_type = 2;
+}
+
+message PField {
+  required PGenericType type = 1;
+  optional string name = 2;
+  optional string comment = 3;
+}
+
+message PStruct {
+  repeated PField fields = 1;
+  required string name = 2;
+}
+
+message PDecimal {
+  required uint32 precision = 1;
+  required uint32 scale = 2;
+}
+
+message PDateTime {
+    optional int32 year = 1;
+    optional int32 month = 2;
+    optional int32 day = 3;
+    optional int32 hour = 4;
+    optional int32 minute = 5;
+    optional int32 second = 6;
+    optional int32 microsecond = 7;
+}
+
+message PValue {
+    required PGenericType type = 1;
+    optional double double_value = 2;
+    optional float float_value = 3;
+    optional int32 int32_value = 4;
+    optional int64 int64_value = 5;
+    optional uint32 uint32_value = 6;
+    optional uint64 uint64_value = 7;
+    optional bool bool_value = 8;
+    optional string string_value = 9;
+    optional bytes bytes_value = 10;
+    optional PDateTime datetime_value = 11;
+    optional bool is_null  = 12 [default = false];
+}
+
+message PValues {
+    required PGenericType type = 1;
+    repeated double double_value = 2;
+    repeated float float_value = 3;
+    repeated int32 int32_value = 4;
+    repeated int64 int64_value = 5;
+    repeated uint32 uint32_value = 6;
+    repeated uint64 uint64_value = 7;
+    repeated bool bool_value = 8;
+    repeated string string_value = 9;
+    repeated bytes bytes_value = 10;
+    repeated PDateTime datetime_value = 11;
+    repeated bool null_map = 12;

Review comment:
       move `null_map` and `has_null` to the second and third place?

##########
File path: gensrc/proto/types.proto
##########
@@ -63,3 +67,150 @@ message PUniqueId {
     required int64 lo = 2;
 };
 
+message PGenericType {

Review comment:
       This is a same datatype define in `data.proto`, how about unify them?
   I will do this in PR #7939 

##########
File path: be/src/exprs/rpc_fn_call.cpp
##########
@@ -0,0 +1,332 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/rpc_fn_call.h"
+
+#include "exprs/anyval_util.h"
+#include "exprs/expr_context.h"
+#include "fmt/format.h"
+#include "gen_cpp/function_service.pb.h"
+#include "runtime/runtime_state.h"
+#include "runtime/user_function_cache.h"
+#include "service/brpc.h"
+#include "util/brpc_client_cache.h"
+
+namespace doris {
+
+RPCFnCall::RPCFnCall(const TExprNode& node) : Expr(node), _fn_context_index(-1) {
+    DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::RPC);
+}
+
+Status RPCFnCall::prepare(RuntimeState* state, const RowDescriptor& desc, ExprContext* context) {
+    RETURN_IF_ERROR(Expr::prepare(state, desc, context));
+    DCHECK(!_fn.scalar_fn.symbol.empty());
+
+    FunctionContext::TypeDesc return_type = AnyValUtil::column_type_to_type_desc(_type);
+    std::vector<FunctionContext::TypeDesc> arg_types;
+    bool char_arg = false;
+    for (int i = 0; i < _children.size(); ++i) {
+        arg_types.push_back(AnyValUtil::column_type_to_type_desc(_children[i]->type()));
+        char_arg = char_arg || (_children[i]->type().type == TYPE_CHAR);
+    }
+    _fn_context_index = context->register_func(state, return_type, arg_types, 0);
+
+    // _fn.scalar_fn.symbol
+    _rpc_function_symbol = _fn.scalar_fn.symbol;
+
+    _client = state->exec_env()->brpc_function_client_cache()->get_client(_fn.hdfs_location);
+
+    if (_client == nullptr) {
+        return Status::InternalError(
+                fmt::format("rpc env init error: {}/{}", _fn.hdfs_location, _rpc_function_symbol));
+    }
+    return Status::OK();
+}
+
+Status RPCFnCall::open(RuntimeState* state, ExprContext* ctx,
+                       FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(Expr::open(state, ctx, scope));
+    return Status::OK();
+}
+
+void RPCFnCall::close(RuntimeState* state, ExprContext* context,
+                      FunctionContext::FunctionStateScope scope) {
+    Expr::close(state, context, scope);
+}
+
+Status RPCFnCall::_eval_children(ExprContext* context, TupleRow* row,
+                                 PFunctionCallResponse* response) {
+    PFunctionCallRequest request;
+    request.set_function_name(_rpc_function_symbol);
+    int64_t name_hash = 0;
+    murmur_hash3_x64_64(_rpc_function_symbol.c_str(), _rpc_function_symbol.size(), 21217891,
+                        &name_hash);
+    request.set_name_hash(name_hash);
+    for (int i = 0; i < _children.size(); ++i) {
+        PValues* arg = request.add_args();
+        void* src_slot = context->get_value(_children[i], row);
+        PGenericType* ptype = arg->mutable_type();
+        if (src_slot == nullptr) {
+            arg->set_has_null(true);
+            arg->add_null_map(true);
+        } else {
+            arg->set_has_null(false);
+        }
+        switch (_children[i]->type().type) {
+        case TYPE_BOOLEAN: {
+            ptype->set_id(PGenericType::BOOLEAN);
+            arg->add_bool_value(*(bool*)src_slot);
+            break;
+        }
+        case TYPE_TINYINT: {
+            ptype->set_id(PGenericType::INT8);
+            arg->add_int32_value(*(int8_t*)src_slot);
+            break;
+        }
+        case TYPE_SMALLINT: {
+            ptype->set_id(PGenericType::INT16);
+            arg->add_int32_value(*(int16_t*)src_slot);
+            break;
+        }
+        case TYPE_INT: {
+            ptype->set_id(PGenericType::INT32);
+            arg->add_int32_value(*(int*)src_slot);
+            break;
+        }
+        case TYPE_BIGINT: {
+            ptype->set_id(PGenericType::INT64);
+            arg->add_int64_value(*(int64_t*)src_slot);
+            break;
+        }
+        case TYPE_LARGEINT: {
+            ptype->set_id(PGenericType::INT128);
+            char buffer[sizeof(__int128)];
+            memcpy(buffer, src_slot, sizeof(__int128));
+            arg->add_bytes_value(buffer, sizeof(__int128));
+            break;
+        }
+        case TYPE_DOUBLE: {
+            ptype->set_id(PGenericType::DOUBLE);
+            arg->add_double_value(*(double*)src_slot);
+            break;
+        }
+        case TYPE_FLOAT: {
+            ptype->set_id(PGenericType::FLOAT);
+            arg->add_float_value(*(float*)src_slot);
+            break;
+        }
+        case TYPE_VARCHAR:
+        case TYPE_STRING:
+        case TYPE_CHAR: {
+            ptype->set_id(PGenericType::STRING);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_HLL: {
+            ptype->set_id(PGenericType::HLL);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_OBJECT: {
+            ptype->set_id(PGenericType::BITMAP);
+            StringValue value = *reinterpret_cast<StringValue*>(src_slot);
+            arg->add_string_value(value.ptr, value.len);
+            break;
+        }
+        case TYPE_DECIMALV2: {
+            ptype->set_id(PGenericType::DECIMAL128);
+            ptype->mutable_decimal_type()->set_precision(_children[i]->type().precision);
+            ptype->mutable_decimal_type()->set_scale(_children[i]->type().scale);
+            char buffer[sizeof(__int128)];
+            memcpy(buffer, src_slot, sizeof(__int128));
+            arg->add_bytes_value(buffer, sizeof(__int128));
+            break;
+        }
+        case TYPE_DATE: {
+            ptype->set_id(PGenericType::DATE);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_day(time_val->day());
+            date_time->set_month(time_val->month());
+            date_time->set_year(time_val->year());
+            break;
+        }
+        case TYPE_DATETIME: {
+            ptype->set_id(PGenericType::DATETIME);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_day(time_val->day());
+            date_time->set_month(time_val->month());
+            date_time->set_year(time_val->year());
+            date_time->set_hour(time_val->hour());
+            date_time->set_minute(time_val->minute());
+            date_time->set_second(time_val->second());
+            date_time->set_microsecond(time_val->microsecond());
+            break;
+        }
+        case TYPE_TIME: {
+            ptype->set_id(PGenericType::DATETIME);
+            const auto* time_val = (const DateTimeValue*)(src_slot);
+            PDateTime* date_time = arg->add_datetime_value();
+            date_time->set_hour(time_val->hour());
+            date_time->set_minute(time_val->minute());
+            date_time->set_second(time_val->second());
+            date_time->set_microsecond(time_val->microsecond());
+            break;
+        }
+        default: {
+            FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+            fn_ctx->set_error(
+                    fmt::format("data time not supported: {}", _children[i]->type().type).c_str());
+            break;
+        }
+        }
+    }
+
+    brpc::Controller cntl;
+    _client->fn_call(&cntl, &request, response, nullptr);
+    if (cntl.Failed()) {
+        FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+        fn_ctx->set_error(cntl.ErrorText().c_str());
+        return Status::InternalError(fmt::format("call to rpc function {} failed: {}",
+                                                 _rpc_function_symbol, cntl.ErrorText())
+                                             .c_str());
+    }
+    if (response->status().status_code() != 0) {
+        FunctionContext* fn_ctx = context->fn_context(_fn_context_index);
+        fn_ctx->set_error(response->status().DebugString().c_str());
+        return Status::InternalError(fmt::format("call to rpc function {} failed: {}",
+                                                 _rpc_function_symbol,
+                                                 response->status().DebugString()));
+    }
+    return Status::OK();
+}
+
+template <typename T>
+T RPCFnCall::interpret_eval(ExprContext* context, TupleRow* row) {
+    T res_val;
+    PFunctionCallResponse response;
+    Status st = _eval_children(context, row, &response);
+    if (!st.ok() || response.status().status_code() != 0 ||

Review comment:
       The situation of `response.status().status_code() != 0` has been checked in `_eval_children()`.
   No need to check again here.

##########
File path: gensrc/proto/types.proto
##########
@@ -63,3 +67,150 @@ message PUniqueId {
     required int64 lo = 2;
 };
 
+message PGenericType {
+    enum TypeId {
+        UINT8 = 0;
+        UINT16 = 1;
+        UINT32 = 2;
+        UINT64 = 3;
+        UINT128 = 4;
+        UINT256 = 5;
+        INT8 = 6;
+        INT16 = 7;
+        INT32 = 8;
+        INT64 = 9;
+        INT128 = 10;
+        INT256 = 11;
+        FLOAT = 12;
+        DOUBLE = 13;
+        BOOLEAN = 14;
+        DATE = 15;
+        DATETIME = 16;
+        HLL = 17;
+        BITMAP = 18;
+        LIST = 19;
+        MAP = 20;
+        STRUCT =21;
+        STRING = 22;
+        DECIMAL32 = 23;
+        DECIMAL64 = 24;
+        DECIMAL128 = 25;
+        BYTES = 26;
+        NOTHING = 27;
+        UNKNOWN = 999;
+    }
+    required TypeId id = 2;
+    optional PList list_type = 11;
+    optional PMap map_type = 12;
+    optional PStruct struct_type = 13;
+    optional PDecimal decimal_type = 14;
+}
+
+message PList {
+  required PGenericType element_type = 1;
+}
+
+message PMap {
+  required PGenericType key_type = 1;
+  required PGenericType value_type = 2;
+}
+
+message PField {
+  required PGenericType type = 1;
+  optional string name = 2;
+  optional string comment = 3;
+}
+
+message PStruct {
+  repeated PField fields = 1;
+  required string name = 2;
+}
+
+message PDecimal {
+  required uint32 precision = 1;
+  required uint32 scale = 2;
+}
+
+message PDateTime {
+    optional int32 year = 1;
+    optional int32 month = 2;
+    optional int32 day = 3;
+    optional int32 hour = 4;
+    optional int32 minute = 5;
+    optional int32 second = 6;
+    optional int32 microsecond = 7;
+}
+
+message PValue {
+    required PGenericType type = 1;
+    optional double double_value = 2;
+    optional float float_value = 3;
+    optional int32 int32_value = 4;
+    optional int64 int64_value = 5;
+    optional uint32 uint32_value = 6;
+    optional uint64 uint64_value = 7;
+    optional bool bool_value = 8;
+    optional string string_value = 9;
+    optional bytes bytes_value = 10;
+    optional PDateTime datetime_value = 11;
+    optional bool is_null  = 12 [default = false];

Review comment:
       better to move `is_null` to the second place? (order 2)

##########
File path: be/src/vec/functions/function_rpc.cpp
##########
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/functions/function_rpc.h"
+
+#include <fmt/format.h>
+
+#include <memory>
+
+#include "gen_cpp/function_service.pb.h"
+#include "runtime/exec_env.h"
+#include "runtime/user_function_cache.h"
+#include "service/brpc.h"
+#include "util/brpc_client_cache.h"
+#include "vec/columns/column_vector.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_bitmap.h"
+#include "vec/data_types/data_type_date.h"
+#include "vec/data_types/data_type_date_time.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+
+namespace doris::vectorized {
+RPCFnCall::RPCFnCall(const std::string& symbol, const std::string& server,
+                     const DataTypes& argument_types, const DataTypePtr& return_type)
+        : _symbol(symbol),
+          _server(server),
+          _name(fmt::format("{}/{}", server, symbol)),
+          _argument_types(argument_types),
+          _return_type(return_type) {}
+Status RPCFnCall::prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) {
+    _client = ExecEnv::GetInstance()->brpc_function_client_cache()->get_client(_server);
+
+    if (_client == nullptr) {
+        return Status::InternalError("rpc env init error");
+    }
+    return Status::OK();
+}
+
+template <bool nullable>
+void convert_col_to_pvalue(const ColumnPtr& column, const DataTypePtr& data_type, PValues* arg,
+                           size_t row_count) {
+    PGenericType* ptype = arg->mutable_type();
+    switch (data_type->get_type_id()) {
+    case TypeIndex::UInt8: {
+        ptype->set_id(PGenericType::UINT8);
+        auto* values = arg->mutable_bool_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt8>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::UInt16: {
+        ptype->set_id(PGenericType::UINT16);
+        auto* values = arg->mutable_uint32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt16>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::UInt32: {
+        ptype->set_id(PGenericType::UINT32);
+        auto* values = arg->mutable_uint32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt32>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::UInt64: {
+        ptype->set_id(PGenericType::UINT64);
+        auto* values = arg->mutable_uint64_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt64>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::UInt128: {
+        ptype->set_id(PGenericType::UINT128);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    case TypeIndex::Int8: {
+        ptype->set_id(PGenericType::INT8);
+        auto* values = arg->mutable_int32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt8>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::Int16: {
+        ptype->set_id(PGenericType::INT16);
+        auto* values = arg->mutable_int32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt16>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::Int32: {
+        ptype->set_id(PGenericType::INT32);
+        auto* values = arg->mutable_int32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt32>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::Int64: {
+        ptype->set_id(PGenericType::INT64);
+        auto* values = arg->mutable_int64_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt64>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::Int128: {
+        ptype->set_id(PGenericType::INT128);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    case TypeIndex::Float32: {
+        ptype->set_id(PGenericType::FLOAT);
+        auto* values = arg->mutable_float_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnFloat32>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+
+    case TypeIndex::Float64: {
+        ptype->set_id(PGenericType::DOUBLE);
+        auto* values = arg->mutable_double_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnFloat64>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin(), data.begin() + row_count);
+        break;
+    }
+    case TypeIndex::Decimal128: {
+        ptype->set_id(PGenericType::DECIMAL128);
+        auto dec_type = std::reinterpret_pointer_cast<const DataTypeDecimal<Decimal128>>(data_type);
+        ptype->mutable_decimal_type()->set_precision(dec_type->get_precision());
+        ptype->mutable_decimal_type()->set_scale(dec_type->get_scale());
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    case TypeIndex::String: {
+        ptype->set_id(PGenericType::STRING);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_string_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_string_value(data.to_string());
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_string_value(data.to_string());
+            }
+        }
+        break;
+    }
+    case TypeIndex::Date: {
+        ptype->set_id(PGenericType::DATE);
+        arg->mutable_datetime_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            PDateTime* date_time = arg->add_datetime_value();
+            if constexpr (nullable) {
+                if (!column->is_null_at(row_num)) {
+                    VecDateTimeValue v = VecDateTimeValue(column->get_int(row_num));
+                    date_time->set_day(v.day());
+                    date_time->set_month(v.month());
+                    date_time->set_year(v.year());
+                }
+            } else {
+                VecDateTimeValue v = VecDateTimeValue(column->get_int(row_num));
+                date_time->set_day(v.day());
+                date_time->set_month(v.month());
+                date_time->set_year(v.year());
+            }
+        }
+        break;
+    }
+    case TypeIndex::DateTime: {
+        ptype->set_id(PGenericType::DATETIME);
+        arg->mutable_datetime_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            PDateTime* date_time = arg->add_datetime_value();
+            if constexpr (nullable) {
+                if (!column->is_null_at(row_num)) {
+                    VecDateTimeValue v = VecDateTimeValue(column->get_int(row_num));
+                    date_time->set_day(v.day());
+                    date_time->set_month(v.month());
+                    date_time->set_year(v.year());
+                    date_time->set_hour(v.hour());
+                    date_time->set_minute(v.minute());
+                    date_time->set_second(v.second());
+                }
+            } else {
+                VecDateTimeValue v = VecDateTimeValue(column->get_int(row_num));
+                date_time->set_day(v.day());
+                date_time->set_month(v.month());
+                date_time->set_year(v.year());
+                date_time->set_hour(v.hour());
+                date_time->set_minute(v.minute());
+                date_time->set_second(v.second());
+            }
+        }
+        break;
+    }
+    case TypeIndex::BitMap: {
+        ptype->set_id(PGenericType::BITMAP);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = 0; row_num < row_count; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    default:
+        LOG(INFO) << "unknown type: " << data_type->get_name();
+        ptype->set_id(PGenericType::UNKNOWN);
+        break;
+    }
+}
+
+void convert_nullable_col_to_pvalue(const ColumnPtr& column, const DataTypePtr& data_type,
+                                    const ColumnUInt8& null_col, PValues* arg, size_t row_count) {
+    if (column->has_null(row_count)) {
+        auto* null_map = arg->mutable_null_map();
+        null_map->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt8>(null_col);
+        auto& data = col->get_data();
+        null_map->Add(data.begin(), data.begin() + row_count);
+        convert_col_to_pvalue<true>(column, data_type, arg, row_count);
+    } else {
+        convert_col_to_pvalue<false>(column, data_type, arg, row_count);
+    }
+}
+
+void convert_block_to_proto(Block& block, const ColumnNumbers& arguments, size_t input_rows_count,
+                            PFunctionCallRequest* request) {
+    size_t row_count = std::min(block.rows(), input_rows_count);
+    for (size_t col_idx : arguments) {
+        PValues* arg = request->add_args();
+        ColumnWithTypeAndName& column = block.get_by_position(col_idx);
+        arg->set_has_null(column.column->has_null(row_count));
+        auto col = column.column->convert_to_full_column_if_const();
+        if (auto* nullable = check_and_get_column<const ColumnNullable>(*col)) {
+            auto data_col = nullable->get_nested_column_ptr();
+            auto& null_col = nullable->get_null_map_column();
+            auto data_type = std::reinterpret_pointer_cast<const DataTypeNullable>(column.type);
+            convert_nullable_col_to_pvalue(data_col->convert_to_full_column_if_const(),
+                                           data_type->get_nested_type(), null_col, arg, row_count);
+        } else {
+            convert_col_to_pvalue<false>(col, column.type, arg, row_count);
+        }
+    }
+}
+
+template <bool nullable>
+void convert_to_column(MutableColumnPtr& column, const PValues& result) {
+    switch (result.type().id()) {
+    case PGenericType::UINT8: {
+        column->reserve(result.uint32_value_size());
+        column->resize(result.uint32_value_size());
+        auto& data = reinterpret_cast<ColumnUInt8*>(column.get())->get_data();
+        for (int i = 0; i < result.uint32_value_size(); ++i) {
+            data[i] = result.uint32_value(i);
+        }
+        break;
+    }
+    case PGenericType::UINT16: {
+        column->reserve(result.uint32_value_size());
+        column->resize(result.uint32_value_size());
+        auto& data = reinterpret_cast<ColumnUInt16*>(column.get())->get_data();
+        for (int i = 0; i < result.uint32_value_size(); ++i) {
+            data[i] = result.uint32_value(i);
+        }
+        break;
+    }
+    case PGenericType::UINT32: {
+        column->reserve(result.uint32_value_size());
+        column->resize(result.uint32_value_size());
+        auto& data = reinterpret_cast<ColumnUInt32*>(column.get())->get_data();
+        for (int i = 0; i < result.uint32_value_size(); ++i) {
+            data[i] = result.uint32_value(i);
+        }
+        break;
+    }
+    case PGenericType::UINT64: {
+        column->reserve(result.uint64_value_size());
+        column->resize(result.uint64_value_size());
+        auto& data = reinterpret_cast<ColumnUInt64*>(column.get())->get_data();
+        for (int i = 0; i < result.uint64_value_size(); ++i) {
+            data[i] = result.uint64_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT8: {
+        column->reserve(result.int32_value_size());
+        column->resize(result.int32_value_size());
+        auto& data = reinterpret_cast<ColumnInt16*>(column.get())->get_data();
+        for (int i = 0; i < result.int32_value_size(); ++i) {
+            data[i] = result.int32_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT16: {
+        column->reserve(result.int32_value_size());
+        column->resize(result.int32_value_size());
+        auto& data = reinterpret_cast<ColumnInt16*>(column.get())->get_data();
+        for (int i = 0; i < result.int32_value_size(); ++i) {
+            data[i] = result.int32_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT32: {
+        column->reserve(result.int32_value_size());
+        column->resize(result.int32_value_size());
+        auto& data = reinterpret_cast<ColumnInt32*>(column.get())->get_data();
+        for (int i = 0; i < result.int32_value_size(); ++i) {
+            data[i] = result.int32_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT64: {
+        column->reserve(result.int64_value_size());
+        column->resize(result.int64_value_size());
+        auto& data = reinterpret_cast<ColumnInt64*>(column.get())->get_data();
+        for (int i = 0; i < result.int64_value_size(); ++i) {
+            data[i] = result.int64_value(i);
+        }
+        break;
+    }
+    case PGenericType::DATE:
+    case PGenericType::DATETIME: {
+        column->reserve(result.datetime_value_size());
+        column->resize(result.datetime_value_size());
+        auto& data = reinterpret_cast<ColumnInt64*>(column.get())->get_data();
+        for (int i = 0; i < result.datetime_value_size(); ++i) {
+            VecDateTimeValue v;
+            PDateTime pv = result.datetime_value(i);
+            v.set_time(pv.year(), pv.month(), pv.day(), pv.hour(), pv.minute(), pv.minute());
+            data[i] = binary_cast<VecDateTimeValue, Int64>(v);
+        }
+        break;
+    }
+    case PGenericType::FLOAT: {
+        column->reserve(result.float_value_size());
+        column->resize(result.float_value_size());
+        auto& data = reinterpret_cast<ColumnFloat32*>(column.get())->get_data();
+        for (int i = 0; i < result.float_value_size(); ++i) {
+            data[i] = result.float_value(i);
+        }
+        break;
+    }
+    case PGenericType::DOUBLE: {
+        column->reserve(result.double_value_size());
+        column->resize(result.double_value_size());
+        auto& data = reinterpret_cast<ColumnFloat64*>(column.get())->get_data();
+        for (int i = 0; i < result.double_value_size(); ++i) {
+            data[i] = result.double_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT128: {
+        column->reserve(result.bytes_value_size());
+        column->resize(result.bytes_value_size());
+        auto& data = reinterpret_cast<ColumnInt128*>(column.get())->get_data();
+        for (int i = 0; i < result.bytes_value_size(); ++i) {
+            data[i] = *(int128_t*)(result.bytes_value(i).c_str());
+        }
+        break;
+    }
+    case PGenericType::STRING: {
+        column->reserve(result.string_value_size());
+        for (int i = 0; i < result.string_value_size(); ++i) {
+            column->insert_data(result.string_value(i).c_str(), result.string_value(i).size());
+        }
+        break;
+    }
+    case PGenericType::DECIMAL128: {
+        column->reserve(result.bytes_value_size());
+        column->resize(result.bytes_value_size());
+        auto& data = reinterpret_cast<ColumnDecimal128*>(column.get())->get_data();
+        for (int i = 0; i < result.bytes_value_size(); ++i) {
+            data[i] = *(int128_t*)(result.bytes_value(i).c_str());
+        }
+        break;
+    }
+    case PGenericType::BITMAP: {
+        column->reserve(result.bytes_value_size());
+        for (int i = 0; i < result.bytes_value_size(); ++i) {
+            column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size());
+        }
+        break;
+    }
+    default: {
+        LOG(WARNING) << "unknown PGenericType: " << result.type().DebugString();
+        break;
+    }
+    }
+}
+
+void convert_to_block(Block& block, const PValues& result, size_t pos) {
+    auto data_type = block.get_data_type(pos);
+    if (data_type->is_nullable()) {
+        auto null_type = std::reinterpret_pointer_cast<const DataTypeNullable>(data_type);
+        auto data_col = null_type->get_nested_type()->create_column();
+        convert_to_column<true>(data_col, result);
+        auto null_col = ColumnUInt8::create(data_col->size(), 0);
+        auto& null_map_data = null_col->get_data();
+        null_col->reserve(data_col->size());
+        null_col->resize(data_col->size());
+        if (result.has_null()) {
+            for (int i = 0; i < data_col->size(); ++i) {
+                null_map_data[i] = result.null_map(i);
+            }
+        } else {
+            for (int i = 0; i < data_col->size(); ++i) {
+                null_map_data[i] = false;
+            }
+        }
+        block.replace_by_position(
+                pos, std::move(ColumnNullable::create(std::move(data_col), std::move(null_col))));
+    } else {
+        auto column = data_type->create_column();
+        convert_to_column<false>(column, result);
+        block.replace_by_position(pos, std::move(column));
+    }
+}
+
+Status RPCFnCall::execute(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
+                          size_t result, size_t input_rows_count, bool dry_run) {
+    PFunctionCallRequest request;
+    PFunctionCallResponse response;
+    request.set_function_name(_symbol);
+    int64_t name_hash = 0;
+    murmur_hash3_x64_64(_symbol.c_str(), _symbol.size(), 21217891, &name_hash);

Review comment:
       Does `murmur_hash3_x64_64` work for ARM64?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
morningman commented on pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#issuecomment-1031154316


   Document need to be added later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] stalary commented on pull request #7519: [feature] Support udf through RPC

Posted by GitBox <gi...@apache.org>.
stalary commented on pull request #7519:
URL: https://github.com/apache/incubator-doris/pull/7519#issuecomment-1008003672


   _brpc_stub_cache split _internal_client_cache& _function_client_cache Whether compatibility problems exist


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org