You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/13 01:21:45 UTC

[doris] branch master updated: [refactor](rpc fn) decouple vectorized remote function from row-based one (#15871)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 174e5e601f [refactor](rpc fn) decouple vectorized remote function from row-based one (#15871)
174e5e601f is described below

commit 174e5e601fca57941e644d4200b68c118a325b9b
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Jan 13 09:21:33 2023 +0800

    [refactor](rpc fn) decouple vectorized remote function from row-based one (#15871)
---
 be/src/vec/functions/function_rpc.cpp | 505 +++++++++++++++++++++++++++++++++-
 be/src/vec/functions/function_rpc.h   |  42 ++-
 2 files changed, 538 insertions(+), 9 deletions(-)

diff --git a/be/src/vec/functions/function_rpc.cpp b/be/src/vec/functions/function_rpc.cpp
index 97d31710ca..2dfcfce8a7 100644
--- a/be/src/vec/functions/function_rpc.cpp
+++ b/be/src/vec/functions/function_rpc.cpp
@@ -21,17 +21,516 @@
 
 #include <memory>
 
-#include "exprs/rpc_fn.h"
+#include "gen_cpp/Exprs_types.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
 
 namespace doris::vectorized {
+
+RPCFnImpl::RPCFnImpl(const TFunction& fn) : _fn(fn) {
+    _client = ExecEnv::GetInstance()->brpc_function_client_cache()->get_client(_server_addr);
+    _function_name = _fn.scalar_fn.symbol;
+    _server_addr = _fn.hdfs_location;
+    _signature = fmt::format("{}: [{}/{}]", _fn.name.function_name, _fn.hdfs_location,
+                             _fn.scalar_fn.symbol);
+}
+
+void RPCFnImpl::_convert_nullable_col_to_pvalue(const ColumnPtr& column,
+                                                const DataTypePtr& data_type,
+                                                const ColumnUInt8& null_col, PValues* arg,
+                                                int start, int end) {
+    int row_count = end - start;
+    if (column->has_null(row_count)) {
+        auto* null_map = arg->mutable_null_map();
+        null_map->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt8>(null_col);
+        auto& data = col->get_data();
+        null_map->Add(data.begin() + start, data.begin() + end);
+        this->_convert_col_to_pvalue<true>(column, data_type, arg, start, end);
+    } else {
+        this->_convert_col_to_pvalue<false>(column, data_type, arg, start, end);
+    }
+}
+
+template <bool nullable>
+void RPCFnImpl::_convert_col_to_pvalue(const ColumnPtr& column, const DataTypePtr& data_type,
+                                       PValues* arg, int start, int end) {
+    int row_count = end - start;
+    PGenericType* ptype = arg->mutable_type();
+    switch (data_type->get_type_id()) {
+    case TypeIndex::UInt8: {
+        ptype->set_id(PGenericType::UINT8);
+        auto* values = arg->mutable_bool_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt8>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin() + start, data.begin() + end);
+        break;
+    }
+    case TypeIndex::UInt16: {
+        ptype->set_id(PGenericType::UINT16);
+        auto* values = arg->mutable_uint32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt16>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin() + start, data.begin() + end);
+        break;
+    }
+    case TypeIndex::UInt32: {
+        ptype->set_id(PGenericType::UINT32);
+        auto* values = arg->mutable_uint32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt32>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin() + start, data.begin() + end);
+        break;
+    }
+    case TypeIndex::UInt64: {
+        ptype->set_id(PGenericType::UINT64);
+        auto* values = arg->mutable_uint64_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnUInt64>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin() + start, data.begin() + end);
+        break;
+    }
+    case TypeIndex::UInt128: {
+        ptype->set_id(PGenericType::UINT128);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = start; row_num < end; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    case TypeIndex::Int8: {
+        ptype->set_id(PGenericType::INT8);
+        auto* values = arg->mutable_int32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt8>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin() + start, data.begin() + end);
+        break;
+    }
+    case TypeIndex::Int16: {
+        ptype->set_id(PGenericType::INT16);
+        auto* values = arg->mutable_int32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt16>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin() + start, data.begin() + end);
+        break;
+    }
+    case TypeIndex::Int32: {
+        ptype->set_id(PGenericType::INT32);
+        auto* values = arg->mutable_int32_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt32>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin() + start, data.begin() + end);
+        break;
+    }
+    case TypeIndex::Int64: {
+        ptype->set_id(PGenericType::INT64);
+        auto* values = arg->mutable_int64_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnInt64>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin() + start, data.begin() + end);
+        break;
+    }
+    case TypeIndex::Int128: {
+        ptype->set_id(PGenericType::INT128);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = start; row_num < end; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    case TypeIndex::Float32: {
+        ptype->set_id(PGenericType::FLOAT);
+        auto* values = arg->mutable_float_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnFloat32>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin() + start, data.begin() + end);
+        break;
+    }
+
+    case TypeIndex::Float64: {
+        ptype->set_id(PGenericType::DOUBLE);
+        auto* values = arg->mutable_double_value();
+        values->Reserve(row_count);
+        const auto* col = check_and_get_column<ColumnFloat64>(column);
+        auto& data = col->get_data();
+        values->Add(data.begin() + start, data.begin() + end);
+        break;
+    }
+    case TypeIndex::String: {
+        ptype->set_id(PGenericType::STRING);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = start; row_num < end; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_string_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_string_value(data.to_string());
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_string_value(data.to_string());
+            }
+        }
+        break;
+    }
+    case TypeIndex::Date: {
+        ptype->set_id(PGenericType::DATE);
+        arg->mutable_datetime_value()->Reserve(row_count);
+        for (size_t row_num = start; row_num < end; ++row_num) {
+            PDateTime* date_time = arg->add_datetime_value();
+            if constexpr (nullable) {
+                if (!column->is_null_at(row_num)) {
+                    VecDateTimeValue v =
+                            VecDateTimeValue::create_from_olap_date(column->get_int(row_num));
+                    date_time->set_day(v.day());
+                    date_time->set_month(v.month());
+                    date_time->set_year(v.year());
+                }
+            } else {
+                VecDateTimeValue v =
+                        VecDateTimeValue::create_from_olap_date(column->get_int(row_num));
+                date_time->set_day(v.day());
+                date_time->set_month(v.month());
+                date_time->set_year(v.year());
+            }
+        }
+        break;
+    }
+    case TypeIndex::DateTime: {
+        ptype->set_id(PGenericType::DATETIME);
+        arg->mutable_datetime_value()->Reserve(row_count);
+        for (size_t row_num = start; row_num < end; ++row_num) {
+            PDateTime* date_time = arg->add_datetime_value();
+            if constexpr (nullable) {
+                if (!column->is_null_at(row_num)) {
+                    VecDateTimeValue v =
+                            VecDateTimeValue::create_from_olap_datetime(column->get_int(row_num));
+                    date_time->set_day(v.day());
+                    date_time->set_month(v.month());
+                    date_time->set_year(v.year());
+                    date_time->set_hour(v.hour());
+                    date_time->set_minute(v.minute());
+                    date_time->set_second(v.second());
+                }
+            } else {
+                VecDateTimeValue v =
+                        VecDateTimeValue::create_from_olap_datetime(column->get_int(row_num));
+                date_time->set_day(v.day());
+                date_time->set_month(v.month());
+                date_time->set_year(v.year());
+                date_time->set_hour(v.hour());
+                date_time->set_minute(v.minute());
+                date_time->set_second(v.second());
+            }
+        }
+        break;
+    }
+    case TypeIndex::BitMap: {
+        ptype->set_id(PGenericType::BITMAP);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = start; row_num < end; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    case TypeIndex::HLL: {
+        ptype->set_id(PGenericType::HLL);
+        arg->mutable_bytes_value()->Reserve(row_count);
+        for (size_t row_num = start; row_num < end; ++row_num) {
+            if constexpr (nullable) {
+                if (column->is_null_at(row_num)) {
+                    arg->add_bytes_value(nullptr);
+                } else {
+                    StringRef data = column->get_data_at(row_num);
+                    arg->add_bytes_value(data.data, data.size);
+                }
+            } else {
+                StringRef data = column->get_data_at(row_num);
+                arg->add_bytes_value(data.data, data.size);
+            }
+        }
+        break;
+    }
+    default:
+        LOG(INFO) << "unknown type: " << data_type->get_name();
+        ptype->set_id(PGenericType::UNKNOWN);
+        break;
+    }
+}
+
+template <bool nullable>
+void RPCFnImpl::_convert_to_column(MutableColumnPtr& column, const PValues& result) {
+    switch (result.type().id()) {
+    case PGenericType::UINT8: {
+        column->reserve(result.uint32_value_size());
+        column->resize(result.uint32_value_size());
+        auto& data = reinterpret_cast<ColumnUInt8*>(column.get())->get_data();
+        for (int i = 0; i < result.uint32_value_size(); ++i) {
+            data[i] = result.uint32_value(i);
+        }
+        break;
+    }
+    case PGenericType::UINT16: {
+        column->reserve(result.uint32_value_size());
+        column->resize(result.uint32_value_size());
+        auto& data = reinterpret_cast<ColumnUInt16*>(column.get())->get_data();
+        for (int i = 0; i < result.uint32_value_size(); ++i) {
+            data[i] = result.uint32_value(i);
+        }
+        break;
+    }
+    case PGenericType::UINT32: {
+        column->reserve(result.uint32_value_size());
+        column->resize(result.uint32_value_size());
+        auto& data = reinterpret_cast<ColumnUInt32*>(column.get())->get_data();
+        for (int i = 0; i < result.uint32_value_size(); ++i) {
+            data[i] = result.uint32_value(i);
+        }
+        break;
+    }
+    case PGenericType::UINT64: {
+        column->reserve(result.uint64_value_size());
+        column->resize(result.uint64_value_size());
+        auto& data = reinterpret_cast<ColumnUInt64*>(column.get())->get_data();
+        for (int i = 0; i < result.uint64_value_size(); ++i) {
+            data[i] = result.uint64_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT8: {
+        column->reserve(result.int32_value_size());
+        column->resize(result.int32_value_size());
+        auto& data = reinterpret_cast<ColumnInt16*>(column.get())->get_data();
+        for (int i = 0; i < result.int32_value_size(); ++i) {
+            data[i] = result.int32_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT16: {
+        column->reserve(result.int32_value_size());
+        column->resize(result.int32_value_size());
+        auto& data = reinterpret_cast<ColumnInt16*>(column.get())->get_data();
+        for (int i = 0; i < result.int32_value_size(); ++i) {
+            data[i] = result.int32_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT32: {
+        column->reserve(result.int32_value_size());
+        column->resize(result.int32_value_size());
+        auto& data = reinterpret_cast<ColumnInt32*>(column.get())->get_data();
+        for (int i = 0; i < result.int32_value_size(); ++i) {
+            data[i] = result.int32_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT64: {
+        column->reserve(result.int64_value_size());
+        column->resize(result.int64_value_size());
+        auto& data = reinterpret_cast<ColumnInt64*>(column.get())->get_data();
+        for (int i = 0; i < result.int64_value_size(); ++i) {
+            data[i] = result.int64_value(i);
+        }
+        break;
+    }
+    case PGenericType::DATE:
+    case PGenericType::DATETIME: {
+        column->reserve(result.datetime_value_size());
+        column->resize(result.datetime_value_size());
+        auto& data = reinterpret_cast<ColumnInt64*>(column.get())->get_data();
+        for (int i = 0; i < result.datetime_value_size(); ++i) {
+            VecDateTimeValue v;
+            PDateTime pv = result.datetime_value(i);
+            v.set_time(pv.year(), pv.month(), pv.day(), pv.hour(), pv.minute(), pv.minute());
+            data[i] = binary_cast<VecDateTimeValue, Int64>(v);
+        }
+        break;
+    }
+    case PGenericType::FLOAT: {
+        column->reserve(result.float_value_size());
+        column->resize(result.float_value_size());
+        auto& data = reinterpret_cast<ColumnFloat32*>(column.get())->get_data();
+        for (int i = 0; i < result.float_value_size(); ++i) {
+            data[i] = result.float_value(i);
+        }
+        break;
+    }
+    case PGenericType::DOUBLE: {
+        column->reserve(result.double_value_size());
+        column->resize(result.double_value_size());
+        auto& data = reinterpret_cast<ColumnFloat64*>(column.get())->get_data();
+        for (int i = 0; i < result.double_value_size(); ++i) {
+            data[i] = result.double_value(i);
+        }
+        break;
+    }
+    case PGenericType::INT128: {
+        column->reserve(result.bytes_value_size());
+        column->resize(result.bytes_value_size());
+        auto& data = reinterpret_cast<ColumnInt128*>(column.get())->get_data();
+        for (int i = 0; i < result.bytes_value_size(); ++i) {
+            data[i] = *(int128_t*)(result.bytes_value(i).c_str());
+        }
+        break;
+    }
+    case PGenericType::STRING: {
+        column->reserve(result.string_value_size());
+        for (int i = 0; i < result.string_value_size(); ++i) {
+            column->insert_data(result.string_value(i).c_str(), result.string_value(i).size());
+        }
+        break;
+    }
+    case PGenericType::DECIMAL128: {
+        column->reserve(result.bytes_value_size());
+        column->resize(result.bytes_value_size());
+        auto& data = reinterpret_cast<ColumnDecimal128*>(column.get())->get_data();
+        for (int i = 0; i < result.bytes_value_size(); ++i) {
+            data[i] = *(int128_t*)(result.bytes_value(i).c_str());
+        }
+        break;
+    }
+    case PGenericType::BITMAP: {
+        column->reserve(result.bytes_value_size());
+        for (int i = 0; i < result.bytes_value_size(); ++i) {
+            column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size());
+        }
+        break;
+    }
+    case PGenericType::HLL: {
+        column->reserve(result.bytes_value_size());
+        for (int i = 0; i < result.bytes_value_size(); ++i) {
+            column->insert_data(result.bytes_value(i).c_str(), result.bytes_value(i).size());
+        }
+        break;
+    }
+    default: {
+        LOG(WARNING) << "unknown PGenericType: " << result.type().DebugString();
+        break;
+    }
+    }
+}
+
+Status RPCFnImpl::vec_call(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
+                           size_t result, size_t input_rows_count) {
+    PFunctionCallRequest request;
+    PFunctionCallResponse response;
+    request.set_function_name(_function_name);
+    _convert_block_to_proto(block, arguments, input_rows_count, &request);
+    brpc::Controller cntl;
+    _client->fn_call(&cntl, &request, &response, nullptr);
+    if (cntl.Failed()) {
+        return Status::InternalError("call to rpc function {} failed: {}", _signature,
+                                     cntl.ErrorText());
+    }
+    if (!response.has_status() || response.result_size() == 0) {
+        return Status::InternalError("call rpc function {} failed: status or result is not set.",
+                                     _signature);
+    }
+    if (response.status().status_code() != 0) {
+        return Status::InternalError("call to rpc function {} failed: {}", _signature,
+                                     response.status().DebugString());
+    }
+    _convert_to_block(block, response.result(0), result);
+    return Status::OK();
+}
+
+void RPCFnImpl::_convert_block_to_proto(Block& block, const ColumnNumbers& arguments,
+                                        size_t input_rows_count, PFunctionCallRequest* request) {
+    size_t row_count = std::min(block.rows(), input_rows_count);
+    for (size_t col_idx : arguments) {
+        PValues* arg = request->add_args();
+        ColumnWithTypeAndName& column = block.get_by_position(col_idx);
+        arg->set_has_null(column.column->has_null(row_count));
+        auto col = column.column->convert_to_full_column_if_const();
+        if (auto* nullable = check_and_get_column<const ColumnNullable>(*col)) {
+            auto data_col = nullable->get_nested_column_ptr();
+            auto& null_col = nullable->get_null_map_column();
+            auto data_type = std::reinterpret_pointer_cast<const DataTypeNullable>(column.type);
+            _convert_nullable_col_to_pvalue(data_col->convert_to_full_column_if_const(),
+                                            data_type->get_nested_type(), null_col, arg, 0,
+                                            row_count);
+        } else {
+            _convert_col_to_pvalue<false>(col, column.type, arg, 0, row_count);
+        }
+    }
+}
+
+void RPCFnImpl::_convert_to_block(Block& block, const PValues& result, size_t pos) {
+    auto data_type = block.get_data_type(pos);
+    if (data_type->is_nullable()) {
+        auto null_type = std::reinterpret_pointer_cast<const DataTypeNullable>(data_type);
+        auto data_col = null_type->get_nested_type()->create_column();
+        _convert_to_column<true>(data_col, result);
+        auto null_col = ColumnUInt8::create(data_col->size(), 0);
+        auto& null_map_data = null_col->get_data();
+        null_col->reserve(data_col->size());
+        null_col->resize(data_col->size());
+        if (result.has_null()) {
+            for (int i = 0; i < data_col->size(); ++i) {
+                null_map_data[i] = result.null_map(i);
+            }
+        } else {
+            for (int i = 0; i < data_col->size(); ++i) {
+                null_map_data[i] = false;
+            }
+        }
+        block.replace_by_position(pos,
+                                  ColumnNullable::create(std::move(data_col), std::move(null_col)));
+    } else {
+        auto column = data_type->create_column();
+        _convert_to_column<false>(column, result);
+        block.replace_by_position(pos, std::move(column));
+    }
+}
+
 FunctionRPC::FunctionRPC(const TFunction& fn, const DataTypes& argument_types,
                          const DataTypePtr& return_type)
         : _argument_types(argument_types), _return_type(return_type), _tfn(fn) {}
 
 Status FunctionRPC::prepare(FunctionContext* context, FunctionContext::FunctionStateScope scope) {
-    _fn = std::make_unique<RPCFn>(_tfn, false);
+    _fn = std::make_unique<RPCFnImpl>(_tfn);
 
-    if (!_fn->avliable()) {
+    if (!_fn->available()) {
         return Status::InternalError("rpc env init error");
     }
     return Status::OK();
diff --git a/be/src/vec/functions/function_rpc.h b/be/src/vec/functions/function_rpc.h
index a4037958dd..56d953744f 100644
--- a/be/src/vec/functions/function_rpc.h
+++ b/be/src/vec/functions/function_rpc.h
@@ -17,12 +17,43 @@
 
 #pragma once
 
+#include "gen_cpp/function_service.pb.h"
+#include "util/brpc_client_cache.h"
 #include "vec/functions/function.h"
 
-namespace doris {
-class RPCFn;
+namespace doris::vectorized {
+
+class RPCFnImpl {
+public:
+    RPCFnImpl(const TFunction& fn);
+    ~RPCFnImpl() = default;
+    Status vec_call(FunctionContext* context, vectorized::Block& block,
+                    const std::vector<size_t>& arguments, size_t result, size_t input_rows_count);
+    bool available() { return _client != nullptr; }
+
+private:
+    void _convert_block_to_proto(vectorized::Block& block,
+                                 const vectorized::ColumnNumbers& arguments,
+                                 size_t input_rows_count, PFunctionCallRequest* request);
+    void _convert_to_block(vectorized::Block& block, const PValues& result, size_t pos);
+    void _convert_nullable_col_to_pvalue(const vectorized::ColumnPtr& column,
+                                         const vectorized::DataTypePtr& data_type,
+                                         const vectorized::ColumnUInt8& null_col, PValues* arg,
+                                         int start, int end);
+    template <bool nullable>
+    void _convert_col_to_pvalue(const vectorized::ColumnPtr& column,
+                                const vectorized::DataTypePtr& data_type, PValues* arg, int start,
+                                int end);
+    template <bool nullable>
+    void _convert_to_column(vectorized::MutableColumnPtr& column, const PValues& result);
+
+    std::shared_ptr<PFunctionService_Stub> _client;
+    std::string _function_name;
+    std::string _server_addr;
+    std::string _signature;
+    TFunction _fn;
+};
 
-namespace vectorized {
 class FunctionRPC : public IFunctionBase {
 public:
     FunctionRPC(const TFunction& fn, const DataTypes& argument_types,
@@ -64,8 +95,7 @@ private:
     DataTypes _argument_types;
     DataTypePtr _return_type;
     TFunction _tfn;
-    std::unique_ptr<RPCFn> _fn;
+    std::unique_ptr<RPCFnImpl> _fn;
 };
 
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org