You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/14 02:03:59 UTC
[incubator-doris] branch master updated: [refactor](es) Clean es tcp scannode and related thrift definitions (#9553)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new cd105bee0a [refactor](es) Clean es tcp scannode and related thrift definitions (#9553)
cd105bee0a is described below
commit cd105bee0afe4bb00a872562e547f72753e7e43a
Author: yiguolei <67...@qq.com>
AuthorDate: Sat May 14 10:03:55 2022 +0800
[refactor](es) Clean es tcp scannode and related thrift definitions (#9553)
PaloExternalSourcesService is designed for es_scan_node using tcp protocol.
But es tcp protocol need deploy a tcp jar into es code. Both es version and lucene version are upgraded,
and the tcp jar is not maintained any more.
So that I remove all the related code and thrift definitions.
---
be/src/exec/CMakeLists.txt | 1 -
be/src/exec/es/es_predicate.h | 1 -
be/src/exec/es_scan_node.cpp | 868 ---------------------
be/src/exec/es_scan_node.h | 88 ---
be/src/exec/exec_node.cpp | 6 -
be/src/exprs/expr_context.h | 1 -
be/src/gen_cpp/CMakeLists.txt | 3 -
be/src/runtime/client_cache.h | 3 -
be/src/runtime/exec_env.h | 10 -
be/src/runtime/exec_env_init.cpp | 5 -
be/src/util/thrift_rpc_helper.cpp | 5 -
be/test/CMakeLists.txt | 1 -
be/test/exec/es_scan_node_test.cpp | 147 ----
gensrc/thrift/PaloExternalDataSourceService.thrift | 250 ------
14 files changed, 1389 deletions(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 709cb4de84..202ae767b0 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -57,7 +57,6 @@ set(EXEC_FILES
csv_scan_node.cpp
csv_scanner.cpp
table_function_node.cpp
- es_scan_node.cpp
es_http_scan_node.cpp
es_http_scanner.cpp
es/es_predicate.cpp
diff --git a/be/src/exec/es/es_predicate.h b/be/src/exec/es/es_predicate.h
index 28552d3eee..05c4e2b946 100644
--- a/be/src/exec/es/es_predicate.h
+++ b/be/src/exec/es/es_predicate.h
@@ -23,7 +23,6 @@
#include "exprs/slot_ref.h"
#include "gen_cpp/Exprs_types.h"
#include "gen_cpp/Opcodes_types.h"
-#include "gen_cpp/PaloExternalDataSourceService_types.h"
#include "runtime/descriptors.h"
#include "runtime/primitive_type.h"
#include "runtime/tuple.h"
diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp
deleted file mode 100644
index a0a89d3a68..0000000000
--- a/be/src/exec/es_scan_node.cpp
+++ /dev/null
@@ -1,868 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "es_scan_node.h"
-
-#include <gutil/strings/substitute.h>
-
-#include <boost/algorithm/string.hpp>
-#include <string>
-
-#include "exprs/expr.h"
-#include "exprs/expr_context.h"
-#include "exprs/in_predicate.h"
-#include "exprs/slot_ref.h"
-#include "gen_cpp/Exprs_types.h"
-#include "gen_cpp/PlanNodes_types.h"
-#include "olap/olap_common.h"
-#include "olap/utils.h"
-#include "runtime/client_cache.h"
-#include "runtime/row_batch.h"
-#include "runtime/runtime_state.h"
-#include "runtime/string_value.h"
-#include "runtime/tuple_row.h"
-#include "service/backend_options.h"
-#include "util/debug_util.h"
-#include "util/runtime_profile.h"
-
-namespace doris {
-
-// $0 = column type (e.g. INT)
-const std::string ERROR_INVALID_COL_DATA =
- "Data source returned inconsistent column data. "
- "Expected value of type $0 based on column metadata. This likely indicates a "
- "problem with the data source library.";
-const std::string ERROR_MEM_LIMIT_EXCEEDED =
- "DataSourceScanNode::$0() failed to allocate "
- "$1 bytes for $2.";
-
-EsScanNode::EsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
- : ScanNode(pool, tnode, descs),
- _tuple_id(tnode.es_scan_node.tuple_id),
- _tuple_desc(nullptr),
- _env(nullptr),
- _scan_range_idx(0) {
- if (tnode.es_scan_node.__isset.properties) {
- _properties = tnode.es_scan_node.properties;
- }
-}
-
-EsScanNode::~EsScanNode() {}
-
-Status EsScanNode::prepare(RuntimeState* state) {
- VLOG_CRITICAL << "EsScanNode::Prepare";
-
- RETURN_IF_ERROR(ScanNode::prepare(state));
- SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
- _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
- if (_tuple_desc == nullptr) {
- std::stringstream ss;
- ss << "es tuple descriptor is null, _tuple_id=" << _tuple_id;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
- _env = state->exec_env();
-
- return Status::OK();
-}
-
-Status EsScanNode::open(RuntimeState* state) {
- VLOG_CRITICAL << "EsScanNode::Open";
-
- RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
- RETURN_IF_CANCELLED(state);
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
- RETURN_IF_ERROR(ExecNode::open(state));
-
- // TExtOpenParams.row_schema
- std::vector<TExtColumnDesc> cols;
- for (const SlotDescriptor* slot : _tuple_desc->slots()) {
- TExtColumnDesc col;
- col.__set_name(slot->col_name());
- col.__set_type(slot->type().to_thrift());
- cols.emplace_back(std::move(col));
- }
- TExtTableSchema row_schema;
- row_schema.cols = std::move(cols);
- row_schema.__isset.cols = true;
-
- // TExtOpenParams.predicates
- std::vector<vector<TExtPredicate>> predicates;
- std::vector<int> predicate_to_conjunct;
- for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
- VLOG_CRITICAL << "conjunct: " << _conjunct_ctxs[i]->root()->debug_string();
- std::vector<TExtPredicate> disjuncts;
- if (get_disjuncts(_conjunct_ctxs[i], _conjunct_ctxs[i]->root(), disjuncts)) {
- predicates.emplace_back(std::move(disjuncts));
- predicate_to_conjunct.push_back(i);
- }
- }
-
- // open every scan range
- std::vector<int> conjunct_accepted_times(_conjunct_ctxs.size(), 0);
- for (int i = 0; i < _scan_ranges.size(); ++i) {
- TEsScanRange& es_scan_range = _scan_ranges[i];
-
- if (es_scan_range.es_hosts.empty()) {
- std::stringstream ss;
- ss << "es fail to open: hosts empty";
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
-
- // TExtOpenParams
- TExtOpenParams params;
- params.__set_query_id(state->query_id());
- _properties["index"] = es_scan_range.index;
- if (es_scan_range.__isset.type) {
- _properties["type"] = es_scan_range.type;
- }
- _properties["shard_id"] = std::to_string(es_scan_range.shard_id);
- params.__set_properties(_properties);
- params.__set_row_schema(row_schema);
- params.__set_batch_size(state->batch_size());
- params.__set_predicates(predicates);
- TExtOpenResult result;
-
- // choose an es node, local is the first choice
- std::string localhost = BackendOptions::get_localhost();
- bool is_success = false;
- for (int j = 0; j < 2; ++j) {
- for (auto& es_host : es_scan_range.es_hosts) {
- if ((j == 0 && es_host.hostname != localhost) ||
- (j == 1 && es_host.hostname == localhost)) {
- continue;
- }
- Status status = open_es(es_host, result, params);
- if (status.ok()) {
- is_success = true;
- _addresses.push_back(es_host);
- _scan_handles.push_back(result.scan_handle);
- if (result.__isset.accepted_conjuncts) {
- for (int index : result.accepted_conjuncts) {
- conjunct_accepted_times[predicate_to_conjunct[index]]++;
- }
- }
- break;
- } else if (status.code() == TStatusCode::ES_SHARD_NOT_FOUND) {
- // if shard not found, try other nodes
- LOG(WARNING) << "shard not found on es node: "
- << ", address=" << es_host << ", scan_range_idx=" << i
- << ", try other nodes";
- } else {
- LOG(WARNING) << "es open error: scan_range_idx=" << i << ", address=" << es_host
- << ", msg=" << status.get_error_msg();
- return status;
- }
- }
- if (is_success) {
- break;
- }
- }
-
- if (!is_success) {
- std::stringstream ss;
- ss << "es open error: scan_range_idx=" << i << ", can't find shard on any node";
- return Status::InternalError(ss.str());
- }
- }
-
- // remove those conjuncts that accepted by all scan ranges
- for (int i = predicate_to_conjunct.size() - 1; i >= 0; i--) {
- int conjunct_index = predicate_to_conjunct[i];
- if (conjunct_accepted_times[conjunct_index] == _scan_ranges.size()) {
- _pushdown_conjunct_ctxs.push_back(*(_conjunct_ctxs.begin() + conjunct_index));
- _conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index);
- }
- }
-
- for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
- if (!check_left_conjuncts(_conjunct_ctxs[i]->root())) {
- return Status::InternalError(
- "esquery could only be executed on es, but could not push down to es");
- }
- }
-
- return Status::OK();
-}
-
-Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
- VLOG_CRITICAL << "EsScanNode::GetNext";
-
- RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
- RETURN_IF_CANCELLED(state);
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
-
- // create tuple
- MemPool* tuple_pool = row_batch->tuple_data_pool();
- int64_t tuple_buffer_size;
- uint8_t* tuple_buffer = nullptr;
- RETURN_IF_ERROR(
- row_batch->resize_and_allocate_tuple_buffer(state, &tuple_buffer_size, &tuple_buffer));
- Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
-
- // get batch
- TExtGetNextResult result;
- RETURN_IF_ERROR(get_next_from_es(result));
- _offsets[_scan_range_idx] += result.rows.num_rows;
-
- // convert
- VLOG_CRITICAL << "begin to convert: scan_range_idx=" << _scan_range_idx
- << ", num_rows=" << result.rows.num_rows;
- std::vector<TExtColumnData>& cols = result.rows.cols;
- // indexes of the next non-null value in the row batch, per column.
- std::vector<int> cols_next_val_idx(_tuple_desc->slots().size(), 0);
- for (int row_idx = 0; row_idx < result.rows.num_rows; row_idx++) {
- if (reached_limit()) {
- *eos = true;
- break;
- }
- RETURN_IF_ERROR(materialize_row(tuple_pool, tuple, cols, row_idx, cols_next_val_idx));
- TupleRow* tuple_row = row_batch->get_row(row_batch->add_row());
- tuple_row->set_tuple(0, tuple);
- if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), _conjunct_ctxs.size(), tuple_row)) {
- row_batch->commit_last_row();
- tuple = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple) +
- _tuple_desc->byte_size());
- ++_num_rows_returned;
- }
- }
-
- VLOG_CRITICAL << "finish one batch: num_rows=" << row_batch->num_rows();
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
- if (result.__isset.eos && result.eos) {
- VLOG_CRITICAL << "es finish one scan_range: scan_range_idx=" << _scan_range_idx;
- ++_scan_range_idx;
- }
- if (_scan_range_idx == _scan_ranges.size()) {
- *eos = true;
- }
-
- return Status::OK();
-}
-
-Status EsScanNode::close(RuntimeState* state) {
- if (is_closed()) return Status::OK();
- VLOG_CRITICAL << "EsScanNode::Close";
- RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
- SCOPED_TIMER(_runtime_profile->total_time_counter());
- Expr::close(_pushdown_conjunct_ctxs, state);
- RETURN_IF_ERROR(ExecNode::close(state));
- for (int i = 0; i < _addresses.size(); ++i) {
- TExtCloseParams params;
- params.__set_scan_handle(_scan_handles[i]);
- TExtCloseResult result;
-
-#ifndef BE_TEST
- const TNetworkAddress& address = _addresses[i];
- try {
- Status status;
- ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache();
- ExtDataSourceServiceConnection client(client_cache, address, 10000, &status);
- if (!status.ok()) {
- LOG(WARNING) << "es create client error: scan_range_idx=" << i
- << ", address=" << address << ", msg=" << status.get_error_msg();
- return status;
- }
-
- try {
- VLOG_CRITICAL << "es close param=" << apache::thrift::ThriftDebugString(params);
- client->close(result, params);
- } catch (apache::thrift::transport::TTransportException& e) {
- LOG(WARNING) << "es close retrying, because: " << e.what();
- RETURN_IF_ERROR(client.reopen());
- client->close(result, params);
- }
- } catch (apache::thrift::TException& e) {
- std::stringstream ss;
- ss << "es close error: scan_range_idx=" << i << ", msg=" << e.what();
- LOG(WARNING) << ss.str();
- return Status::ThriftRpcError(ss.str());
- }
-
- VLOG_CRITICAL << "es close result=" << apache::thrift::ThriftDebugString(result);
- Status status(result.status);
- if (!status.ok()) {
- LOG(WARNING) << "es close error: : scan_range_idx=" << i
- << ", msg=" << status.get_error_msg();
- return status;
- }
-#else
- TStatus status;
- result.__set_status(status);
-#endif
- }
-
- return Status::OK();
-}
-
-void EsScanNode::debug_string(int indentation_level, std::stringstream* out) const {
- *out << string(indentation_level * 2, ' ');
- *out << "EsScanNode(tupleid=" << _tuple_id;
- *out << ")" << std::endl;
-
- for (int i = 0; i < _children.size(); ++i) {
- _children[i]->debug_string(indentation_level + 1, out);
- }
-}
-
-Status EsScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
- for (int i = 0; i < scan_ranges.size(); ++i) {
- TScanRangeParams scan_range = scan_ranges[i];
- DCHECK(scan_range.scan_range.__isset.es_scan_range);
- TEsScanRange es_scan_range = scan_range.scan_range.es_scan_range;
- _scan_ranges.push_back(es_scan_range);
- }
-
- _offsets.resize(scan_ranges.size(), 0);
- return Status::OK();
-}
-
-Status EsScanNode::open_es(TNetworkAddress& address, TExtOpenResult& result,
- TExtOpenParams& params) {
- VLOG_CRITICAL << "es open param=" << apache::thrift::ThriftDebugString(params);
-#ifndef BE_TEST
- try {
- ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache();
- Status status;
- ExtDataSourceServiceConnection client(client_cache, address, 10000, &status);
- if (!status.ok()) {
- std::stringstream ss;
- ss << "es create client error: address=" << address
- << ", msg=" << status.get_error_msg();
- return Status::InternalError(ss.str());
- }
-
- try {
- client->open(result, params);
- } catch (apache::thrift::transport::TTransportException& e) {
- LOG(WARNING) << "es open retrying, because: " << e.what();
- RETURN_IF_ERROR(client.reopen());
- client->open(result, params);
- }
- VLOG_CRITICAL << "es open result=" << apache::thrift::ThriftDebugString(result);
- return Status(result.status);
- } catch (apache::thrift::TException& e) {
- std::stringstream ss;
- ss << "es open error: address=" << address << ", msg=" << e.what();
- return Status::InternalError(ss.str());
- }
-#else
- TStatus status;
- result.__set_status(status);
- result.__set_scan_handle("0");
- return Status(status);
-#endif
-}
-
-// legacy conjuncts must not contain match function
-bool EsScanNode::check_left_conjuncts(Expr* conjunct) {
- if (is_match_func(conjunct)) {
- return false;
- } else {
- int num_children = conjunct->get_num_children();
- for (int child_idx = 0; child_idx < num_children; ++child_idx) {
- if (!check_left_conjuncts(conjunct->get_child(child_idx))) {
- return false;
- }
- }
- return true;
- }
-}
-
-bool EsScanNode::ignore_cast(SlotDescriptor* slot, Expr* expr) {
- if (slot->type().is_date_type() && expr->type().is_date_type()) {
- return true;
- }
- if (slot->type().is_string_type() && expr->type().is_string_type()) {
- return true;
- }
- return false;
-}
-
-bool EsScanNode::get_disjuncts(ExprContext* context, Expr* conjunct,
- std::vector<TExtPredicate>& disjuncts) {
- if (TExprNodeType::BINARY_PRED == conjunct->node_type()) {
- if (conjunct->children().size() != 2) {
- VLOG_CRITICAL << "get disjuncts fail: number of children is not 2";
- return false;
- }
- SlotRef* slotRef;
- TExprOpcode::type op;
- Expr* expr;
- if (TExprNodeType::SLOT_REF == conjunct->get_child(0)->node_type()) {
- expr = conjunct->get_child(1);
- slotRef = (SlotRef*)(conjunct->get_child(0));
- op = conjunct->op();
- } else if (TExprNodeType::SLOT_REF == conjunct->get_child(1)->node_type()) {
- expr = conjunct->get_child(0);
- slotRef = (SlotRef*)(conjunct->get_child(1));
- op = conjunct->op();
- } else {
- VLOG_CRITICAL << "get disjuncts fail: no SLOT_REF child";
- return false;
- }
-
- SlotDescriptor* slot_desc = get_slot_desc(slotRef);
- if (slot_desc == nullptr) {
- VLOG_CRITICAL << "get disjuncts fail: slot_desc is null";
- return false;
- }
-
- TExtLiteral literal;
- if (!to_ext_literal(context, expr, &literal)) {
- VLOG_CRITICAL << "get disjuncts fail: can't get literal, node_type="
- << expr->node_type();
- return false;
- }
-
- TExtColumnDesc columnDesc;
- columnDesc.__set_name(slot_desc->col_name());
- columnDesc.__set_type(slot_desc->type().to_thrift());
- TExtBinaryPredicate binaryPredicate;
- binaryPredicate.__set_col(columnDesc);
- binaryPredicate.__set_op(op);
- binaryPredicate.__set_value(std::move(literal));
- TExtPredicate predicate;
- predicate.__set_node_type(TExprNodeType::BINARY_PRED);
- predicate.__set_binary_predicate(binaryPredicate);
- disjuncts.push_back(std::move(predicate));
- return true;
- } else if (is_match_func(conjunct)) {
- // if this is a function call expr and function name is match, then push
- // down it to es
- TExtFunction match_function;
- match_function.__set_func_name(conjunct->fn().name.function_name);
- std::vector<TExtLiteral> query_conditions;
-
- TExtLiteral literal;
- if (!to_ext_literal(context, conjunct->get_child(1), &literal)) {
- VLOG_CRITICAL << "get disjuncts fail: can't get literal, node_type="
- << conjunct->get_child(1)->node_type();
- return false;
- }
-
- query_conditions.push_back(std::move(literal));
- match_function.__set_values(query_conditions);
- TExtPredicate predicate;
- predicate.__set_node_type(TExprNodeType::FUNCTION_CALL);
- predicate.__set_ext_function(match_function);
- disjuncts.push_back(std::move(predicate));
- return true;
- } else if (TExprNodeType::IN_PRED == conjunct->node_type()) {
- // the op code maybe FILTER_NEW_IN, it means there is function in list
- // like col_a in (abs(1))
- if (TExprOpcode::FILTER_IN != conjunct->op() &&
- TExprOpcode::FILTER_NOT_IN != conjunct->op()) {
- return false;
- }
- TExtInPredicate ext_in_predicate;
- std::vector<TExtLiteral> in_pred_values;
- InPredicate* pred = static_cast<InPredicate*>(conjunct);
- ext_in_predicate.__set_is_not_in(pred->is_not_in());
- if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
- return false;
- }
-
- SlotRef* slot_ref = (SlotRef*)(conjunct->get_child(0));
- SlotDescriptor* slot_desc = get_slot_desc(slot_ref);
- if (slot_desc == nullptr) {
- return false;
- }
- TExtColumnDesc columnDesc;
- columnDesc.__set_name(slot_desc->col_name());
- columnDesc.__set_type(slot_desc->type().to_thrift());
- ext_in_predicate.__set_col(columnDesc);
-
- if (pred->get_child(0)->type().type != slot_desc->type().type) {
- if (!ignore_cast(slot_desc, pred->get_child(0))) {
- return false;
- }
- }
-
- HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin();
- while (iter->has_next()) {
- if (nullptr == iter->get_value()) {
- return false;
- }
- TExtLiteral literal;
- if (!to_ext_literal(slot_desc->type().type, const_cast<void*>(iter->get_value()),
- &literal)) {
- VLOG_CRITICAL << "get disjuncts fail: can't get literal, node_type="
- << slot_desc->type().type;
- return false;
- }
- in_pred_values.push_back(literal);
- iter->next();
- }
- ext_in_predicate.__set_values(in_pred_values);
- TExtPredicate predicate;
- predicate.__set_node_type(TExprNodeType::IN_PRED);
- predicate.__set_in_predicate(ext_in_predicate);
- disjuncts.push_back(std::move(predicate));
- return true;
- } else if (TExprNodeType::COMPOUND_PRED == conjunct->node_type()) {
- if (TExprOpcode::COMPOUND_OR != conjunct->op()) {
- VLOG_CRITICAL << "get disjuncts fail: op is not COMPOUND_OR";
- return false;
- }
- if (!get_disjuncts(context, conjunct->get_child(0), disjuncts)) {
- return false;
- }
- if (!get_disjuncts(context, conjunct->get_child(1), disjuncts)) {
- return false;
- }
- return true;
- } else {
- VLOG_CRITICAL << "get disjuncts fail: node type is " << conjunct->node_type()
- << ", should be BINARY_PRED or COMPOUND_PRED";
- return false;
- }
-}
-
-bool EsScanNode::is_match_func(Expr* conjunct) {
- if (TExprNodeType::FUNCTION_CALL == conjunct->node_type() &&
- conjunct->fn().name.function_name == "esquery") {
- return true;
- }
- return false;
-}
-
-SlotDescriptor* EsScanNode::get_slot_desc(SlotRef* slotRef) {
- std::vector<SlotId> slot_ids;
- slotRef->get_slot_ids(&slot_ids);
- SlotDescriptor* slot_desc = nullptr;
- for (SlotDescriptor* slot : _tuple_desc->slots()) {
- if (slot->id() == slot_ids[0]) {
- slot_desc = slot;
- break;
- }
- }
- return slot_desc;
-}
-
-bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal) {
- switch (expr->node_type()) {
- case TExprNodeType::BOOL_LITERAL:
- case TExprNodeType::INT_LITERAL:
- case TExprNodeType::LARGE_INT_LITERAL:
- case TExprNodeType::FLOAT_LITERAL:
- case TExprNodeType::DECIMAL_LITERAL:
- case TExprNodeType::STRING_LITERAL:
- case TExprNodeType::DATE_LITERAL:
- return to_ext_literal(expr->type().type, context->get_value(expr, nullptr), literal);
- default:
- return false;
- }
-}
-
-bool EsScanNode::to_ext_literal(PrimitiveType slot_type, void* value, TExtLiteral* literal) {
- TExprNodeType::type node_type;
- switch (slot_type) {
- case TYPE_BOOLEAN: {
- node_type = (TExprNodeType::BOOL_LITERAL);
- TBoolLiteral bool_literal;
- bool_literal.__set_value(*reinterpret_cast<bool*>(value));
- literal->__set_bool_literal(bool_literal);
- break;
- }
-
- case TYPE_TINYINT: {
- node_type = (TExprNodeType::INT_LITERAL);
- TIntLiteral int_literal;
- int_literal.__set_value(*reinterpret_cast<int8_t*>(value));
- literal->__set_int_literal(int_literal);
- break;
- }
- case TYPE_SMALLINT: {
- node_type = (TExprNodeType::INT_LITERAL);
- TIntLiteral int_literal;
- int_literal.__set_value(*reinterpret_cast<int16_t*>(value));
- literal->__set_int_literal(int_literal);
- break;
- }
- case TYPE_INT: {
- node_type = (TExprNodeType::INT_LITERAL);
- TIntLiteral int_literal;
- int_literal.__set_value(*reinterpret_cast<int32_t*>(value));
- literal->__set_int_literal(int_literal);
- break;
- }
- case TYPE_BIGINT: {
- node_type = (TExprNodeType::INT_LITERAL);
- TIntLiteral int_literal;
- int_literal.__set_value(*reinterpret_cast<int64_t*>(value));
- literal->__set_int_literal(int_literal);
- break;
- }
-
- case TYPE_LARGEINT: {
- node_type = (TExprNodeType::LARGE_INT_LITERAL);
- TLargeIntLiteral large_int_literal;
- large_int_literal.__set_value(
- LargeIntValue::to_string(*reinterpret_cast<__int128*>(value)));
- literal->__set_large_int_literal(large_int_literal);
- break;
- }
-
- case TYPE_FLOAT: {
- node_type = (TExprNodeType::FLOAT_LITERAL);
- TFloatLiteral float_literal;
- float_literal.__set_value(*reinterpret_cast<float*>(value));
- literal->__set_float_literal(float_literal);
- break;
- }
- case TYPE_DOUBLE: {
- node_type = (TExprNodeType::FLOAT_LITERAL);
- TFloatLiteral float_literal;
- float_literal.__set_value(*reinterpret_cast<double*>(value));
- literal->__set_float_literal(float_literal);
- break;
- }
-
- case TYPE_DATE:
- case TYPE_DATETIME: {
- node_type = (TExprNodeType::DATE_LITERAL);
- const DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
- char str[MAX_DTVALUE_STR_LEN];
- date_value.to_string(str);
- TDateLiteral date_literal;
- date_literal.__set_value(str);
- literal->__set_date_literal(date_literal);
- break;
- }
-
- case TYPE_CHAR:
- case TYPE_VARCHAR:
- case TYPE_STRING: {
- node_type = (TExprNodeType::STRING_LITERAL);
- TStringLiteral string_literal;
- string_literal.__set_value((reinterpret_cast<StringValue*>(value))->debug_string());
- literal->__set_string_literal(string_literal);
- break;
- }
-
- default: {
- DCHECK(false) << "Invalid type.";
- return false;
- }
- }
- literal->__set_node_type(node_type);
- return true;
-}
-
-Status EsScanNode::get_next_from_es(TExtGetNextResult& result) {
- TExtGetNextParams params;
- params.__set_scan_handle(_scan_handles[_scan_range_idx]);
- params.__set_offset(_offsets[_scan_range_idx]);
-
- // getNext
- const TNetworkAddress& address = _addresses[_scan_range_idx];
-#ifndef BE_TEST
- try {
- Status create_client_status;
- ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache();
- ExtDataSourceServiceConnection client(client_cache, address, 10000, &create_client_status);
- if (!create_client_status.ok()) {
- LOG(WARNING) << "es create client error: scan_range_idx=" << _scan_range_idx
- << ", address=" << address
- << ", msg=" << create_client_status.get_error_msg();
- return create_client_status;
- }
-
- try {
- VLOG_CRITICAL << "es get_next param=" << apache::thrift::ThriftDebugString(params);
- client->getNext(result, params);
- } catch (apache::thrift::transport::TTransportException& e) {
- std::stringstream ss;
- ss << "es get_next error: scan_range_idx=" << _scan_range_idx << ", msg=" << e.what();
- LOG(WARNING) << ss.str();
- RETURN_IF_ERROR(client.reopen());
- return Status::ThriftRpcError(ss.str());
- }
- } catch (apache::thrift::TException& e) {
- std::stringstream ss;
- ss << "es get_next error: scan_range_idx=" << _scan_range_idx << ", msg=" << e.what();
- LOG(WARNING) << ss.str();
- return Status::ThriftRpcError(ss.str());
- }
-#else
- TStatus status;
- result.__set_status(status);
- result.__set_eos(true);
- TExtColumnData col_data;
- std::vector<bool> is_null;
- is_null.push_back(false);
- col_data.__set_is_null(is_null);
- std::vector<int32_t> int_vals;
- int_vals.push_back(1);
- int_vals.push_back(2);
- col_data.__set_int_vals(int_vals);
- std::vector<TExtColumnData> cols;
- cols.push_back(col_data);
- TExtRowBatch rows;
- rows.__set_cols(cols);
- rows.__set_num_rows(2);
- result.__set_rows(rows);
- return Status(status);
-#endif
-
- // check result
- VLOG_CRITICAL << "es get_next result=" << apache::thrift::ThriftDebugString(result);
- Status get_next_status(result.status);
- if (!get_next_status.ok()) {
- LOG(WARNING) << "es get_next error: scan_range_idx=" << _scan_range_idx
- << ", address=" << address << ", msg=" << get_next_status.get_error_msg();
- return get_next_status;
- }
- if (!result.__isset.rows || !result.rows.__isset.num_rows) {
- std::stringstream ss;
- ss << "es get_next error: scan_range_idx=" << _scan_range_idx
- << ", msg=rows or num_rows not in result";
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
-
- return Status::OK();
-}
-
-Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple,
- const std::vector<TExtColumnData>& cols, int row_idx,
- std::vector<int>& cols_next_val_idx) {
- tuple->init(_tuple_desc->byte_size());
-
- for (int i = 0; i < _tuple_desc->slots().size(); ++i) {
- const SlotDescriptor* slot_desc = _tuple_desc->slots()[i];
-
- if (!slot_desc->is_materialized()) {
- continue;
- }
-
- void* slot = tuple->get_slot(slot_desc->tuple_offset());
- const TExtColumnData& col = cols[i];
-
- if (col.is_null[row_idx]) {
- tuple->set_null(slot_desc->null_indicator_offset());
- continue;
- } else {
- tuple->set_not_null(slot_desc->null_indicator_offset());
- }
-
- int val_idx = cols_next_val_idx[i]++;
- switch (slot_desc->type().type) {
- case TYPE_CHAR:
- case TYPE_VARCHAR:
- case TYPE_STRING: {
- if (val_idx >= col.string_vals.size()) {
- return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "STRING"));
- }
- const string& val = col.string_vals[val_idx];
- size_t val_size = val.size();
- Status rst;
- char* buffer =
- reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size, &rst));
- if (UNLIKELY(buffer == nullptr)) {
- std::string details = strings::Substitute(
- ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", val_size, "string slot");
- RETURN_LIMIT_EXCEEDED(tuple_pool->mem_tracker(), nullptr, details, val_size, rst);
- }
- memcpy(buffer, val.data(), val_size);
- reinterpret_cast<StringValue*>(slot)->ptr = buffer;
- reinterpret_cast<StringValue*>(slot)->len = val_size;
- break;
- }
- case TYPE_TINYINT:
- if (val_idx >= col.byte_vals.size()) {
- return Status::InternalError(
- strings::Substitute(ERROR_INVALID_COL_DATA, "TINYINT"));
- }
- *reinterpret_cast<int8_t*>(slot) = col.byte_vals[val_idx];
- break;
- case TYPE_SMALLINT:
- if (val_idx >= col.short_vals.size()) {
- return Status::InternalError(
- strings::Substitute(ERROR_INVALID_COL_DATA, "SMALLINT"));
- }
- *reinterpret_cast<int16_t*>(slot) = col.short_vals[val_idx];
- break;
- case TYPE_INT:
- if (val_idx >= col.int_vals.size()) {
- return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "INT"));
- }
- *reinterpret_cast<int32_t*>(slot) = col.int_vals[val_idx];
- break;
- case TYPE_BIGINT:
- if (val_idx >= col.long_vals.size()) {
- return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "BIGINT"));
- }
- *reinterpret_cast<int64_t*>(slot) = col.long_vals[val_idx];
- break;
- case TYPE_LARGEINT:
- if (val_idx >= col.long_vals.size()) {
- return Status::InternalError(
- strings::Substitute(ERROR_INVALID_COL_DATA, "LARGEINT"));
- }
- *reinterpret_cast<int128_t*>(slot) = col.long_vals[val_idx];
- break;
- case TYPE_DOUBLE:
- if (val_idx >= col.double_vals.size()) {
- return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "DOUBLE"));
- }
- *reinterpret_cast<double*>(slot) = col.double_vals[val_idx];
- break;
- case TYPE_FLOAT:
- if (val_idx >= col.double_vals.size()) {
- return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "FLOAT"));
- }
- *reinterpret_cast<float*>(slot) = col.double_vals[val_idx];
- break;
- case TYPE_BOOLEAN:
- if (val_idx >= col.bool_vals.size()) {
- return Status::InternalError(
- strings::Substitute(ERROR_INVALID_COL_DATA, "BOOLEAN"));
- }
- *reinterpret_cast<int8_t*>(slot) = col.bool_vals[val_idx];
- break;
- case TYPE_DATE:
- if (val_idx >= col.long_vals.size() ||
- !reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.long_vals[val_idx],
- "+08:00")) {
- return Status::InternalError(
- strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATE"));
- }
- reinterpret_cast<DateTimeValue*>(slot)->cast_to_date();
- break;
- case TYPE_DATETIME: {
- if (val_idx >= col.long_vals.size() ||
- !reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.long_vals[val_idx],
- "+08:00")) {
- return Status::InternalError(
- strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATETIME"));
- }
- reinterpret_cast<DateTimeValue*>(slot)->set_type(TIME_DATETIME);
- break;
- }
- default:
- DCHECK(false);
- }
- }
- return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/exec/es_scan_node.h b/be/src/exec/es_scan_node.h
deleted file mode 100644
index 173cddaf68..0000000000
--- a/be/src/exec/es_scan_node.h
+++ /dev/null
@@ -1,88 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <memory>
-#include <vector>
-
-#include "exec/scan_node.h"
-#include "exprs/slot_ref.h"
-#include "gen_cpp/PaloExternalDataSourceService_types.h"
-#include "gen_cpp/TExtDataSourceService.h"
-#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
-#include "runtime/tuple.h"
-
-namespace doris {
-
-class TupleDescriptor;
-class RuntimeState;
-class Status;
-
-class EsScanNode : public ScanNode {
-public:
- EsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
- ~EsScanNode();
-
- virtual Status prepare(RuntimeState* state) override;
- virtual Status open(RuntimeState* state) override;
- virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
- virtual Status close(RuntimeState* state) override;
- virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
-
-protected:
- // Write debug string of this into out.
- virtual void debug_string(int indentation_level, std::stringstream* out) const override;
-
-private:
- Status open_es(TNetworkAddress& address, TExtOpenResult& result, TExtOpenParams& params);
- Status materialize_row(MemPool* tuple_pool, Tuple* tuple, const vector<TExtColumnData>& cols,
- int next_row_idx, vector<int>& cols_next_val_idx);
- Status get_next_from_es(TExtGetNextResult& result);
-
- bool get_disjuncts(ExprContext* context, Expr* conjunct, vector<TExtPredicate>& disjuncts);
- bool to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal);
- bool to_ext_literal(PrimitiveType node_type, void* value, TExtLiteral* literal);
- bool ignore_cast(SlotDescriptor* slot, Expr* expr);
-
- bool is_match_func(Expr* conjunct);
-
- SlotDescriptor* get_slot_desc(SlotRef* slotRef);
-
- // check if open result meets condition
- // 1. check if left conjuncts contain "match" function, since match function could only be executed on es
- bool check_left_conjuncts(Expr* conjunct);
-
-private:
- TupleId _tuple_id;
- std::map<std::string, std::string> _properties;
- const TupleDescriptor* _tuple_desc;
- ExecEnv* _env;
- std::vector<TEsScanRange> _scan_ranges;
-
- // scan range's iterator, used in get_next()
- int _scan_range_idx;
-
- // store every scan range's netaddress/handle/offset
- std::vector<TNetworkAddress> _addresses;
- std::vector<std::string> _scan_handles;
- std::vector<int> _offsets;
- std::vector<ExprContext*> _pushdown_conjunct_ctxs;
-};
-
-} // namespace doris
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 4dbd827605..ccd8e3c854 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -33,7 +33,6 @@
#include "exec/cross_join_node.h"
#include "exec/empty_set_node.h"
#include "exec/es_http_scan_node.h"
-#include "exec/es_scan_node.h"
#include "exec/except_node.h"
#include "exec/exchange_node.h"
#include "exec/hash_join_node.h"
@@ -428,10 +427,6 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
*node = pool->add(new OdbcScanNode(pool, tnode, descs));
return Status::OK();
- case TPlanNodeType::ES_SCAN_NODE:
- *node = pool->add(new EsScanNode(pool, tnode, descs));
- return Status::OK();
-
case TPlanNodeType::ES_HTTP_SCAN_NODE:
if (state->enable_vectorized_exec()) {
*node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs));
@@ -662,7 +657,6 @@ void ExecNode::collect_nodes(TPlanNodeType::type node_type, std::vector<ExecNode
void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes);
- collect_nodes(TPlanNodeType::ES_SCAN_NODE, nodes);
collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes);
}
diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h
index 32d408e99d..7be7a8f15f 100644
--- a/be/src/exprs/expr_context.h
+++ b/be/src/exprs/expr_context.h
@@ -160,7 +160,6 @@ private:
friend class RuntimePredicateWrapper;
friend class BloomFilterPredicate;
friend class OlapScanNode;
- friend class EsScanNode;
friend class EsPredicate;
/// FunctionContexts for each registered expression. The FunctionContexts are created
diff --git a/be/src/gen_cpp/CMakeLists.txt b/be/src/gen_cpp/CMakeLists.txt
index 22aa8c9cfe..c4b2f0dc14 100644
--- a/be/src/gen_cpp/CMakeLists.txt
+++ b/be/src/gen_cpp/CMakeLists.txt
@@ -37,9 +37,6 @@ set(SRC_FILES
${GEN_CPP_DIR}/HeartbeatService_types.cpp
${GEN_CPP_DIR}/PaloInternalService_constants.cpp
${GEN_CPP_DIR}/PaloInternalService_types.cpp
- ${GEN_CPP_DIR}/PaloExternalDataSourceService_constants.cpp
- ${GEN_CPP_DIR}/PaloExternalDataSourceService_types.cpp
- ${GEN_CPP_DIR}/TExtDataSourceService.cpp
${GEN_CPP_DIR}/FrontendService.cpp
${GEN_CPP_DIR}/FrontendService_constants.cpp
${GEN_CPP_DIR}/FrontendService_types.cpp
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index e29ecc632e..418917ab29 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -289,8 +289,5 @@ using FrontendServiceConnection = ClientConnection<FrontendServiceClient>;
class TPaloBrokerServiceClient;
using BrokerServiceClientCache = ClientCache<TPaloBrokerServiceClient>;
using BrokerServiceConnection = ClientConnection<TPaloBrokerServiceClient>;
-class TExtDataSourceServiceClient;
-using ExtDataSourceServiceClientCache = ClientCache<TExtDataSourceServiceClient>;
-using ExtDataSourceServiceConnection = ClientConnection<TExtDataSourceServiceClient>;
} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4a279d8142..6fda2e6a1f 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -64,7 +64,6 @@ class SmallFileMgr;
class BackendServiceClient;
class FrontendServiceClient;
class TPaloBrokerServiceClient;
-class TExtDataSourceServiceClient;
class PBackendService_Stub;
class PFunctionService_Stub;
@@ -108,9 +107,6 @@ public:
ClientCache<BackendServiceClient>* client_cache() { return _backend_client_cache; }
ClientCache<FrontendServiceClient>* frontend_client_cache() { return _frontend_client_cache; }
ClientCache<TPaloBrokerServiceClient>* broker_client_cache() { return _broker_client_cache; }
- ClientCache<TExtDataSourceServiceClient>* extdatasource_client_cache() {
- return _extdatasource_client_cache;
- }
// using template to simplify client cache management
template <typename T>
@@ -184,7 +180,6 @@ private:
ClientCache<BackendServiceClient>* _backend_client_cache = nullptr;
ClientCache<FrontendServiceClient>* _frontend_client_cache = nullptr;
ClientCache<TPaloBrokerServiceClient>* _broker_client_cache = nullptr;
- ClientCache<TExtDataSourceServiceClient>* _extdatasource_client_cache = nullptr;
ThreadResourceMgr* _thread_mgr = nullptr;
// The ancestor for all querys tracker.
@@ -248,10 +243,5 @@ inline ClientCache<TPaloBrokerServiceClient>*
ExecEnv::get_client_cache<TPaloBrokerServiceClient>() {
return _broker_client_cache;
}
-template <>
-inline ClientCache<TExtDataSourceServiceClient>*
-ExecEnv::get_client_cache<TExtDataSourceServiceClient>() {
- return _extdatasource_client_cache;
-}
} // namespace doris
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 0c9a6edc84..ff5b847810 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -20,7 +20,6 @@
#include "common/logging.h"
#include "gen_cpp/BackendService.h"
#include "gen_cpp/HeartbeatService_types.h"
-#include "gen_cpp/TExtDataSourceService.h"
#include "gen_cpp/TPaloBrokerService.h"
#include "olap/page_cache.h"
#include "olap/segment_loader.h"
@@ -95,8 +94,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_backend_client_cache = new BackendServiceClientCache(config::max_client_cache_size_per_host);
_frontend_client_cache = new FrontendServiceClientCache(config::max_client_cache_size_per_host);
_broker_client_cache = new BrokerServiceClientCache(config::max_client_cache_size_per_host);
- _extdatasource_client_cache =
- new ExtDataSourceServiceClientCache(config::max_client_cache_size_per_host);
_task_pool_mem_tracker_registry.reset(new MemTrackerTaskPool());
_thread_mgr = new ThreadResourceMgr();
if (config::doris_enable_scanner_thread_pool_per_disk &&
@@ -148,7 +145,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_backend_client_cache->init_metrics("backend");
_frontend_client_cache->init_metrics("frontend");
_broker_client_cache->init_metrics("broker");
- _extdatasource_client_cache->init_metrics("extdatasource");
_result_mgr->init();
_cgroups_mgr->init_cgroups();
_etl_job_mgr->init();
@@ -323,7 +319,6 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_scan_thread_pool);
SAFE_DELETE(_thread_mgr);
SAFE_DELETE(_broker_client_cache);
- SAFE_DELETE(_extdatasource_client_cache);
SAFE_DELETE(_frontend_client_cache);
SAFE_DELETE(_backend_client_cache);
SAFE_DELETE(_result_mgr);
diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp
index 1083df6bb4..94ad060250 100644
--- a/be/src/util/thrift_rpc_helper.cpp
+++ b/be/src/util/thrift_rpc_helper.cpp
@@ -96,9 +96,4 @@ template Status ThriftRpcHelper::rpc<TPaloBrokerServiceClient>(
const std::string& ip, const int32_t port,
std::function<void(ClientConnection<TPaloBrokerServiceClient>&)> callback, int timeout_ms);
-template Status ThriftRpcHelper::rpc<TExtDataSourceServiceClient>(
- const std::string& ip, const int32_t port,
- std::function<void(ClientConnection<TExtDataSourceServiceClient>&)> callback,
- int timeout_ms);
-
} // namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 6b79813060..f39cb68926 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -86,7 +86,6 @@ set(EXEC_TEST_FILES
# exec/schema_scanner/schema_collations_scanner_test.cpp
# exec/schema_scanner/schema_charsets_scanner_test.cpp
# exec/broker_reader_test.cpp
- # exec/es_scan_node_test.cpp
)
if(DEFINED DORIS_WITH_LZO)
diff --git a/be/test/exec/es_scan_node_test.cpp b/be/test/exec/es_scan_node_test.cpp
deleted file mode 100644
index c647170f53..0000000000
--- a/be/test/exec/es_scan_node_test.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/es_scan_node.h"
-
-#include <gtest/gtest.h>
-
-#include <string>
-
-#include "common/object_pool.h"
-#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/descriptors.h"
-#include "runtime/mem_pool.h"
-#include "runtime/row_batch.h"
-#include "runtime/runtime_state.h"
-#include "runtime/string_value.h"
-#include "runtime/tuple_row.h"
-#include "util/debug_util.h"
-#include "util/runtime_profile.h"
-
-using std::vector;
-
-namespace doris {
-
-// mock
-class EsScanNodeTest : public testing::Test {
-public:
- EsScanNodeTest() : _runtime_state(TQueryGlobals()) {
- _runtime_state._instance_mem_tracker.reset(new MemTracker());
- TDescriptorTable t_desc_table;
-
- // table descriptors
- TTableDescriptor t_table_desc;
-
- t_table_desc.id = 0;
- t_table_desc.tableType = TTableType::ES_TABLE;
- t_table_desc.numCols = 0;
- t_table_desc.numClusteringCols = 0;
- t_table_desc.__isset.esTable = true;
- t_desc_table.tableDescriptors.push_back(t_table_desc);
- t_desc_table.__isset.tableDescriptors = true;
- // TSlotDescriptor
- int offset = 1;
- int i = 0;
- // id
- {
- TSlotDescriptor t_slot_desc;
- t_slot_desc.__set_slotType(TypeDescriptor(TYPE_INT).to_thrift());
- t_slot_desc.__set_columnPos(i);
- t_slot_desc.__set_byteOffset(offset);
- t_slot_desc.__set_nullIndicatorByte(0);
- t_slot_desc.__set_nullIndicatorBit(-1);
- t_slot_desc.__set_slotIdx(i);
- t_slot_desc.__set_isMaterialized(true);
- t_desc_table.slotDescriptors.push_back(t_slot_desc);
- offset += sizeof(int);
- }
-
- TTupleDescriptor t_tuple_desc;
- t_tuple_desc.id = 0;
- t_tuple_desc.byteSize = offset;
- t_tuple_desc.numNullBytes = 1;
- t_tuple_desc.tableId = 0;
- t_tuple_desc.__isset.tableId = true;
- t_desc_table.__isset.slotDescriptors = true;
- t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
-
- DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
- _runtime_state.set_desc_tbl(_desc_tbl);
-
- // Node Id
- _tnode.node_id = 0;
- _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE;
- _tnode.num_children = 0;
- _tnode.limit = -1;
- _tnode.row_tuples.push_back(0);
- _tnode.nullable_tuples.push_back(false);
- _tnode.es_scan_node.tuple_id = 0;
- std::map<std::string, std::string> properties;
- _tnode.es_scan_node.__set_properties(properties);
- _tnode.__isset.es_scan_node = true;
- }
-
-protected:
- virtual void SetUp() {}
- virtual void TearDown() {}
- TPlanNode _tnode;
- ObjectPool _obj_pool;
- DescriptorTbl* _desc_tbl;
- RuntimeState _runtime_state;
-};
-
-TEST_F(EsScanNodeTest, normal_use) {
- EsScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
- Status status = scan_node.prepare(&_runtime_state);
- EXPECT_TRUE(status.ok());
- TEsScanRange es_scan_range;
- es_scan_range.__set_index("index1");
- es_scan_range.__set_type("docs");
- es_scan_range.__set_shard_id(0);
- TNetworkAddress es_host;
- es_host.__set_hostname("host");
- es_host.__set_port(8200);
- std::vector<TNetworkAddress> es_hosts;
- es_hosts.push_back(es_host);
- es_scan_range.__set_es_hosts(es_hosts);
- TScanRange scan_range;
- scan_range.__set_es_scan_range(es_scan_range);
- TScanRangeParams scan_range_params;
- scan_range_params.__set_scan_range(scan_range);
- std::vector<TScanRangeParams> scan_ranges;
- scan_ranges.push_back(scan_range_params);
-
- status = scan_node.set_scan_ranges(scan_ranges);
- EXPECT_TRUE(status.ok());
- std::stringstream out;
- scan_node.debug_string(1, &out);
- LOG(WARNING) << out.str();
-
- status = scan_node.open(&_runtime_state);
- EXPECT_TRUE(status.ok());
- RowBatch row_batch(scan_node._row_descriptor, _runtime_state.batch_size());
- bool eos = false;
- status = scan_node.get_next(&_runtime_state, &row_batch, &eos);
- EXPECT_TRUE(status.ok());
- EXPECT_EQ(2, row_batch.num_rows());
- EXPECT_TRUE(eos);
-
- status = scan_node.close(&_runtime_state);
- EXPECT_TRUE(status.ok());
-}
-
-} // namespace doris
diff --git a/gensrc/thrift/PaloExternalDataSourceService.thrift b/gensrc/thrift/PaloExternalDataSourceService.thrift
deleted file mode 100644
index c4cce76d19..0000000000
--- a/gensrc/thrift/PaloExternalDataSourceService.thrift
+++ /dev/null
@@ -1,250 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-namespace java org.apache.doris.thrift
-namespace cpp doris
-
-include "Exprs.thrift"
-include "Opcodes.thrift"
-include "Status.thrift"
-include "Types.thrift"
-
-// A result set column descriptor.
-// this definition id different from column desc in palo, the column desc in palo only support scalar type, does not support map, array
-// so that should convert palo column desc into ExtColumnDesc
-struct TExtColumnDesc {
- // The column name as given in the Create .. statement. Always set.
- 1: optional string name
- // The column type. Always set.
- 2: optional Types.TTypeDesc type
-}
-
-// Metadata used to describe the schema (column names, types, comments)
-// of result sets.
-struct TExtTableSchema {
- // List of columns. Always set.
- 1: optional list<TExtColumnDesc> cols
-}
-
-struct TExtLiteral {
- 1: required Exprs.TExprNodeType node_type
- 2: optional Exprs.TBoolLiteral bool_literal
- 3: optional Exprs.TDateLiteral date_literal
- 4: optional Exprs.TFloatLiteral float_literal
- 5: optional Exprs.TIntLiteral int_literal
- 6: optional Exprs.TStringLiteral string_literal
- 7: optional Exprs.TDecimalLiteral decimal_literal
- 8: optional Exprs.TLargeIntLiteral large_int_literal
-}
-
-// Binary predicates that can be pushed to the external data source and
-// are of the form <col> <op> <val>. Sources can choose to accept or reject
-// predicates via the return value of prepare(), see TPrepareResult.
-// The column and the value are guaranteed to be type compatible in Impala,
-// but they are not necessarily the same type, so the data source
-// implementation may need to do an implicit cast.
-// > < = != >= <=
-struct TExtBinaryPredicate {
- // Column on which the predicate is applied. Always set.
- 1: optional TExtColumnDesc col
- // Comparison operator. Always set.
- 2: optional Opcodes.TExprOpcode op
- // Value on the right side of the binary predicate. Always set.
- 3: optional TExtLiteral value
-}
-
-struct TExtInPredicate {
- 1: optional bool is_not_in
- // Column on which the predicate is applied. Always set.
- 2: optional TExtColumnDesc col
- // Value on the right side of the binary predicate. Always set.
- 3: optional list<TExtLiteral> values
-}
-
-struct TExtLikePredicate {
- 1: optional TExtColumnDesc col
- 2: optional TExtLiteral value
-}
-
-struct TExtIsNullPredicate {
- 1: optional bool is_not_null
- 2: optional TExtColumnDesc col
-}
-
-struct TExtFunction {
- 1: optional string func_name
- // input parameter column descs
- 2: optional list<TExtColumnDesc> cols
- // input parameter column literals
- 3: optional list<TExtLiteral> values
-}
-
-// a union of all predicates
-struct TExtPredicate {
- 1: required Exprs.TExprNodeType node_type
- 2: optional TExtBinaryPredicate binary_predicate
- 3: optional TExtInPredicate in_predicate
- 4: optional TExtLikePredicate like_predicate
- 5: optional TExtIsNullPredicate is_null_predicate
- 6: optional TExtFunction ext_function
-}
-
-// A union over all possible return types for a column of data
-// Currently only used by ExternalDataSource types
-//
-struct TExtColumnData {
- // One element in the list for every row in the column indicating if there is
- // a value in the vals list or a null.
- 1: required list<bool> is_null;
-
- // Only one is set, only non-null values are set. this indicates one column data for a row batch
- 2: optional list<bool> bool_vals;
- 3: optional binary byte_vals;
- 4: optional list<i16> short_vals;
- 5: optional list<i32> int_vals;
- 6: optional list<i64> long_vals;
- 7: optional list<double> double_vals;
- 8: optional list<string> string_vals;
- 9: optional list<binary> binary_vals;
-}
-
-// Serialized batch of rows returned by getNext().
-// one row batch contains mult rows, and the result is arranged in column style
-struct TExtRowBatch {
- // Each TColumnData contains the data for an entire column. Always set.
- 1: optional list<TExtColumnData> cols
-
- // The number of rows returned. For count(*) queries, there may not be
- // any materialized columns so cols will be an empty list and this value
- // will indicate how many rows are returned. When there are materialized
- // columns, this number should be the same as the size of each
- // TColumnData.is_null list.
- 2: optional i64 num_rows
-}
-
-// Parameters to prepare().
-struct TExtPrepareParams {
- // The name of the table. Always set.
- 1: optional string table_name
-
- // A string specified for the table that is passed to the external data source.
- // Always set, may be an empty string.
- 2: optional string init_string
-
- // A list of conjunctive (AND) clauses, each of which contains a list of
- // disjunctive (OR) binary predicates. Always set, may be an empty list.
- 3: optional list<list<TExtPredicate>> predicates
-}
-
-// Returned by prepare().
-struct TExtPrepareResult {
- 1: required Status.TStatus status
-
- // Estimate of the total number of rows returned when applying the predicates indicated
- // by accepted_conjuncts. Not set if the data source does not support providing
- // this statistic.
- 2: optional i64 num_rows_estimate
-
- // Accepted conjuncts. References the 'predicates' parameter in the prepare()
- // call. It contains the 0-based indices of the top-level list elements (the
- // AND elements) that the library will be able to apply during the scan. Those
- // elements that aren’t referenced in accepted_conjuncts will be evaluated by
- // Impala itself.
- 3: optional list<i32> accepted_conjuncts
-}
-
-// Parameters to open().
-struct TExtOpenParams {
- // A unique identifier for the query. Always set.
- 1: optional Types.TUniqueId query_id
-
- // The name of the table. Always set.
- 2: optional string table_name
-
- // A string specified for the table that is passed to the external data source.
- // Always set, may be an empty string.
- 3: optional map<string,string> properties
-
- // The authenticated user name. Always set.
- 4: optional string authenticated_user_name
-
- // The schema of the rows that the scan needs to return. Always set.
- 5: optional TExtTableSchema row_schema
-
- // The expected size of the row batches it returns in the subsequent getNext() calls.
- // Always set.
- 6: optional i32 batch_size
-
- 7: optional list<list<TExtPredicate>> predicates
-
- // The query limit, if specified.
- 8: optional i64 limit
-}
-
-// Returned by open().
-struct TExtOpenResult {
- 1: required Status.TStatus status
-
- // An opaque handle used in subsequent getNext()/close() calls. Required.
- 2: optional string scan_handle
- 3: optional list<i32> accepted_conjuncts
-}
-
-// Parameters to getNext()
-struct TExtGetNextParams {
- // The opaque handle returned by the previous open() call. Always set.
- 1: optional string scan_handle // es search context id
- 2: optional i64 offset // es should check the offset to prevent duplicate rpc calls
-}
-
-// Returned by getNext().
-struct TExtGetNextResult {
- 1: required Status.TStatus status
-
- // If true, reached the end of the result stream; subsequent calls to
- // getNext() won’t return any more results. Required.
- 2: optional bool eos
-
- // A batch of rows to return, if any exist. The number of rows in the batch
- // should be less than or equal to the batch_size specified in TOpenParams.
- 3: optional TExtRowBatch rows
-}
-
-// Parameters to close()
-struct TExtCloseParams {
- // The opaque handle returned by the previous open() call. Always set.
- 1: optional string scan_handle
-}
-
-// Returned by close().
-struct TExtCloseResult {
- 1: required Status.TStatus status
-}
-
-// This data source can be considered as the entry of palo's unified external data source
-service TExtDataSourceService {
- // 1. palo be call this api to send index, type, shard id to es
- // 2. es will open a search context and prepare data, will return a context id
- TExtOpenResult open(1: TExtOpenParams params);
- // 1. palo be will send a search context id to es
- // 2. es will find the search context and find a batch rows and send to palo
- // 3. palo will run the remaining predicates when receving data
- // 4. es should check the offset when receive the request
- TExtGetNextResult getNext(1: TExtGetNextParams params);
- // 1. es will release the context when receiving the data
- TExtCloseResult close(1: TExtCloseParams params);
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org