You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/14 02:03:59 UTC

[incubator-doris] branch master updated: [refactor](es) Clean es tcp scannode and related thrift definitions (#9553)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cd105bee0a [refactor](es) Clean es tcp scannode and related thrift definitions (#9553)
cd105bee0a is described below

commit cd105bee0afe4bb00a872562e547f72753e7e43a
Author: yiguolei <67...@qq.com>
AuthorDate: Sat May 14 10:03:55 2022 +0800

    [refactor](es) Clean es tcp scannode and related thrift definitions (#9553)
    
    PaloExternalSourcesService is designed for es_scan_node using tcp protocol.
    But es tcp protocol need deploy a tcp jar into es code. Both es version and lucene version are upgraded,
    and the tcp jar is not maintained any more.
    
    So that I remove all the related code and thrift definitions.
---
 be/src/exec/CMakeLists.txt                         |   1 -
 be/src/exec/es/es_predicate.h                      |   1 -
 be/src/exec/es_scan_node.cpp                       | 868 ---------------------
 be/src/exec/es_scan_node.h                         |  88 ---
 be/src/exec/exec_node.cpp                          |   6 -
 be/src/exprs/expr_context.h                        |   1 -
 be/src/gen_cpp/CMakeLists.txt                      |   3 -
 be/src/runtime/client_cache.h                      |   3 -
 be/src/runtime/exec_env.h                          |  10 -
 be/src/runtime/exec_env_init.cpp                   |   5 -
 be/src/util/thrift_rpc_helper.cpp                  |   5 -
 be/test/CMakeLists.txt                             |   1 -
 be/test/exec/es_scan_node_test.cpp                 | 147 ----
 gensrc/thrift/PaloExternalDataSourceService.thrift | 250 ------
 14 files changed, 1389 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 709cb4de84..202ae767b0 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -57,7 +57,6 @@ set(EXEC_FILES
     csv_scan_node.cpp
     csv_scanner.cpp
     table_function_node.cpp
-    es_scan_node.cpp
     es_http_scan_node.cpp
     es_http_scanner.cpp
     es/es_predicate.cpp
diff --git a/be/src/exec/es/es_predicate.h b/be/src/exec/es/es_predicate.h
index 28552d3eee..05c4e2b946 100644
--- a/be/src/exec/es/es_predicate.h
+++ b/be/src/exec/es/es_predicate.h
@@ -23,7 +23,6 @@
 #include "exprs/slot_ref.h"
 #include "gen_cpp/Exprs_types.h"
 #include "gen_cpp/Opcodes_types.h"
-#include "gen_cpp/PaloExternalDataSourceService_types.h"
 #include "runtime/descriptors.h"
 #include "runtime/primitive_type.h"
 #include "runtime/tuple.h"
diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp
deleted file mode 100644
index a0a89d3a68..0000000000
--- a/be/src/exec/es_scan_node.cpp
+++ /dev/null
@@ -1,868 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "es_scan_node.h"
-
-#include <gutil/strings/substitute.h>
-
-#include <boost/algorithm/string.hpp>
-#include <string>
-
-#include "exprs/expr.h"
-#include "exprs/expr_context.h"
-#include "exprs/in_predicate.h"
-#include "exprs/slot_ref.h"
-#include "gen_cpp/Exprs_types.h"
-#include "gen_cpp/PlanNodes_types.h"
-#include "olap/olap_common.h"
-#include "olap/utils.h"
-#include "runtime/client_cache.h"
-#include "runtime/row_batch.h"
-#include "runtime/runtime_state.h"
-#include "runtime/string_value.h"
-#include "runtime/tuple_row.h"
-#include "service/backend_options.h"
-#include "util/debug_util.h"
-#include "util/runtime_profile.h"
-
-namespace doris {
-
-// $0 = column type (e.g. INT)
-const std::string ERROR_INVALID_COL_DATA =
-        "Data source returned inconsistent column data. "
-        "Expected value of type $0 based on column metadata. This likely indicates a "
-        "problem with the data source library.";
-const std::string ERROR_MEM_LIMIT_EXCEEDED =
-        "DataSourceScanNode::$0() failed to allocate "
-        "$1 bytes for $2.";
-
-EsScanNode::EsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
-        : ScanNode(pool, tnode, descs),
-          _tuple_id(tnode.es_scan_node.tuple_id),
-          _tuple_desc(nullptr),
-          _env(nullptr),
-          _scan_range_idx(0) {
-    if (tnode.es_scan_node.__isset.properties) {
-        _properties = tnode.es_scan_node.properties;
-    }
-}
-
-EsScanNode::~EsScanNode() {}
-
-Status EsScanNode::prepare(RuntimeState* state) {
-    VLOG_CRITICAL << "EsScanNode::Prepare";
-
-    RETURN_IF_ERROR(ScanNode::prepare(state));
-    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
-    _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
-    if (_tuple_desc == nullptr) {
-        std::stringstream ss;
-        ss << "es tuple descriptor is null, _tuple_id=" << _tuple_id;
-        LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
-    }
-    _env = state->exec_env();
-
-    return Status::OK();
-}
-
-Status EsScanNode::open(RuntimeState* state) {
-    VLOG_CRITICAL << "EsScanNode::Open";
-
-    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
-    RETURN_IF_CANCELLED(state);
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
-    RETURN_IF_ERROR(ExecNode::open(state));
-
-    // TExtOpenParams.row_schema
-    std::vector<TExtColumnDesc> cols;
-    for (const SlotDescriptor* slot : _tuple_desc->slots()) {
-        TExtColumnDesc col;
-        col.__set_name(slot->col_name());
-        col.__set_type(slot->type().to_thrift());
-        cols.emplace_back(std::move(col));
-    }
-    TExtTableSchema row_schema;
-    row_schema.cols = std::move(cols);
-    row_schema.__isset.cols = true;
-
-    // TExtOpenParams.predicates
-    std::vector<vector<TExtPredicate>> predicates;
-    std::vector<int> predicate_to_conjunct;
-    for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
-        VLOG_CRITICAL << "conjunct: " << _conjunct_ctxs[i]->root()->debug_string();
-        std::vector<TExtPredicate> disjuncts;
-        if (get_disjuncts(_conjunct_ctxs[i], _conjunct_ctxs[i]->root(), disjuncts)) {
-            predicates.emplace_back(std::move(disjuncts));
-            predicate_to_conjunct.push_back(i);
-        }
-    }
-
-    // open every scan range
-    std::vector<int> conjunct_accepted_times(_conjunct_ctxs.size(), 0);
-    for (int i = 0; i < _scan_ranges.size(); ++i) {
-        TEsScanRange& es_scan_range = _scan_ranges[i];
-
-        if (es_scan_range.es_hosts.empty()) {
-            std::stringstream ss;
-            ss << "es fail to open: hosts empty";
-            LOG(WARNING) << ss.str();
-            return Status::InternalError(ss.str());
-        }
-
-        // TExtOpenParams
-        TExtOpenParams params;
-        params.__set_query_id(state->query_id());
-        _properties["index"] = es_scan_range.index;
-        if (es_scan_range.__isset.type) {
-            _properties["type"] = es_scan_range.type;
-        }
-        _properties["shard_id"] = std::to_string(es_scan_range.shard_id);
-        params.__set_properties(_properties);
-        params.__set_row_schema(row_schema);
-        params.__set_batch_size(state->batch_size());
-        params.__set_predicates(predicates);
-        TExtOpenResult result;
-
-        // choose an es node, local is the first choice
-        std::string localhost = BackendOptions::get_localhost();
-        bool is_success = false;
-        for (int j = 0; j < 2; ++j) {
-            for (auto& es_host : es_scan_range.es_hosts) {
-                if ((j == 0 && es_host.hostname != localhost) ||
-                    (j == 1 && es_host.hostname == localhost)) {
-                    continue;
-                }
-                Status status = open_es(es_host, result, params);
-                if (status.ok()) {
-                    is_success = true;
-                    _addresses.push_back(es_host);
-                    _scan_handles.push_back(result.scan_handle);
-                    if (result.__isset.accepted_conjuncts) {
-                        for (int index : result.accepted_conjuncts) {
-                            conjunct_accepted_times[predicate_to_conjunct[index]]++;
-                        }
-                    }
-                    break;
-                } else if (status.code() == TStatusCode::ES_SHARD_NOT_FOUND) {
-                    // if shard not found, try other nodes
-                    LOG(WARNING) << "shard not found on es node: "
-                                 << ", address=" << es_host << ", scan_range_idx=" << i
-                                 << ", try other nodes";
-                } else {
-                    LOG(WARNING) << "es open error: scan_range_idx=" << i << ", address=" << es_host
-                                 << ", msg=" << status.get_error_msg();
-                    return status;
-                }
-            }
-            if (is_success) {
-                break;
-            }
-        }
-
-        if (!is_success) {
-            std::stringstream ss;
-            ss << "es open error: scan_range_idx=" << i << ", can't find shard on any node";
-            return Status::InternalError(ss.str());
-        }
-    }
-
-    // remove those conjuncts that accepted by all scan ranges
-    for (int i = predicate_to_conjunct.size() - 1; i >= 0; i--) {
-        int conjunct_index = predicate_to_conjunct[i];
-        if (conjunct_accepted_times[conjunct_index] == _scan_ranges.size()) {
-            _pushdown_conjunct_ctxs.push_back(*(_conjunct_ctxs.begin() + conjunct_index));
-            _conjunct_ctxs.erase(_conjunct_ctxs.begin() + conjunct_index);
-        }
-    }
-
-    for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
-        if (!check_left_conjuncts(_conjunct_ctxs[i]->root())) {
-            return Status::InternalError(
-                    "esquery could only be executed on es, but could not push down to es");
-        }
-    }
-
-    return Status::OK();
-}
-
-Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
-    VLOG_CRITICAL << "EsScanNode::GetNext";
-
-    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
-    RETURN_IF_CANCELLED(state);
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
-
-    // create tuple
-    MemPool* tuple_pool = row_batch->tuple_data_pool();
-    int64_t tuple_buffer_size;
-    uint8_t* tuple_buffer = nullptr;
-    RETURN_IF_ERROR(
-            row_batch->resize_and_allocate_tuple_buffer(state, &tuple_buffer_size, &tuple_buffer));
-    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
-
-    // get batch
-    TExtGetNextResult result;
-    RETURN_IF_ERROR(get_next_from_es(result));
-    _offsets[_scan_range_idx] += result.rows.num_rows;
-
-    // convert
-    VLOG_CRITICAL << "begin to convert: scan_range_idx=" << _scan_range_idx
-                  << ", num_rows=" << result.rows.num_rows;
-    std::vector<TExtColumnData>& cols = result.rows.cols;
-    // indexes of the next non-null value in the row batch, per column.
-    std::vector<int> cols_next_val_idx(_tuple_desc->slots().size(), 0);
-    for (int row_idx = 0; row_idx < result.rows.num_rows; row_idx++) {
-        if (reached_limit()) {
-            *eos = true;
-            break;
-        }
-        RETURN_IF_ERROR(materialize_row(tuple_pool, tuple, cols, row_idx, cols_next_val_idx));
-        TupleRow* tuple_row = row_batch->get_row(row_batch->add_row());
-        tuple_row->set_tuple(0, tuple);
-        if (ExecNode::eval_conjuncts(_conjunct_ctxs.data(), _conjunct_ctxs.size(), tuple_row)) {
-            row_batch->commit_last_row();
-            tuple = reinterpret_cast<Tuple*>(reinterpret_cast<uint8_t*>(tuple) +
-                                             _tuple_desc->byte_size());
-            ++_num_rows_returned;
-        }
-    }
-
-    VLOG_CRITICAL << "finish one batch: num_rows=" << row_batch->num_rows();
-    COUNTER_SET(_rows_returned_counter, _num_rows_returned);
-    if (result.__isset.eos && result.eos) {
-        VLOG_CRITICAL << "es finish one scan_range: scan_range_idx=" << _scan_range_idx;
-        ++_scan_range_idx;
-    }
-    if (_scan_range_idx == _scan_ranges.size()) {
-        *eos = true;
-    }
-
-    return Status::OK();
-}
-
-Status EsScanNode::close(RuntimeState* state) {
-    if (is_closed()) return Status::OK();
-    VLOG_CRITICAL << "EsScanNode::Close";
-    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::CLOSE));
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
-    Expr::close(_pushdown_conjunct_ctxs, state);
-    RETURN_IF_ERROR(ExecNode::close(state));
-    for (int i = 0; i < _addresses.size(); ++i) {
-        TExtCloseParams params;
-        params.__set_scan_handle(_scan_handles[i]);
-        TExtCloseResult result;
-
-#ifndef BE_TEST
-        const TNetworkAddress& address = _addresses[i];
-        try {
-            Status status;
-            ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache();
-            ExtDataSourceServiceConnection client(client_cache, address, 10000, &status);
-            if (!status.ok()) {
-                LOG(WARNING) << "es create client error: scan_range_idx=" << i
-                             << ", address=" << address << ", msg=" << status.get_error_msg();
-                return status;
-            }
-
-            try {
-                VLOG_CRITICAL << "es close param=" << apache::thrift::ThriftDebugString(params);
-                client->close(result, params);
-            } catch (apache::thrift::transport::TTransportException& e) {
-                LOG(WARNING) << "es close retrying, because: " << e.what();
-                RETURN_IF_ERROR(client.reopen());
-                client->close(result, params);
-            }
-        } catch (apache::thrift::TException& e) {
-            std::stringstream ss;
-            ss << "es close error: scan_range_idx=" << i << ", msg=" << e.what();
-            LOG(WARNING) << ss.str();
-            return Status::ThriftRpcError(ss.str());
-        }
-
-        VLOG_CRITICAL << "es close result=" << apache::thrift::ThriftDebugString(result);
-        Status status(result.status);
-        if (!status.ok()) {
-            LOG(WARNING) << "es close error: : scan_range_idx=" << i
-                         << ", msg=" << status.get_error_msg();
-            return status;
-        }
-#else
-        TStatus status;
-        result.__set_status(status);
-#endif
-    }
-
-    return Status::OK();
-}
-
-void EsScanNode::debug_string(int indentation_level, std::stringstream* out) const {
-    *out << string(indentation_level * 2, ' ');
-    *out << "EsScanNode(tupleid=" << _tuple_id;
-    *out << ")" << std::endl;
-
-    for (int i = 0; i < _children.size(); ++i) {
-        _children[i]->debug_string(indentation_level + 1, out);
-    }
-}
-
-Status EsScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {
-    for (int i = 0; i < scan_ranges.size(); ++i) {
-        TScanRangeParams scan_range = scan_ranges[i];
-        DCHECK(scan_range.scan_range.__isset.es_scan_range);
-        TEsScanRange es_scan_range = scan_range.scan_range.es_scan_range;
-        _scan_ranges.push_back(es_scan_range);
-    }
-
-    _offsets.resize(scan_ranges.size(), 0);
-    return Status::OK();
-}
-
-Status EsScanNode::open_es(TNetworkAddress& address, TExtOpenResult& result,
-                           TExtOpenParams& params) {
-    VLOG_CRITICAL << "es open param=" << apache::thrift::ThriftDebugString(params);
-#ifndef BE_TEST
-    try {
-        ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache();
-        Status status;
-        ExtDataSourceServiceConnection client(client_cache, address, 10000, &status);
-        if (!status.ok()) {
-            std::stringstream ss;
-            ss << "es create client error: address=" << address
-               << ", msg=" << status.get_error_msg();
-            return Status::InternalError(ss.str());
-        }
-
-        try {
-            client->open(result, params);
-        } catch (apache::thrift::transport::TTransportException& e) {
-            LOG(WARNING) << "es open retrying, because: " << e.what();
-            RETURN_IF_ERROR(client.reopen());
-            client->open(result, params);
-        }
-        VLOG_CRITICAL << "es open result=" << apache::thrift::ThriftDebugString(result);
-        return Status(result.status);
-    } catch (apache::thrift::TException& e) {
-        std::stringstream ss;
-        ss << "es open error: address=" << address << ", msg=" << e.what();
-        return Status::InternalError(ss.str());
-    }
-#else
-    TStatus status;
-    result.__set_status(status);
-    result.__set_scan_handle("0");
-    return Status(status);
-#endif
-}
-
-// legacy conjuncts must not contain match function
-bool EsScanNode::check_left_conjuncts(Expr* conjunct) {
-    if (is_match_func(conjunct)) {
-        return false;
-    } else {
-        int num_children = conjunct->get_num_children();
-        for (int child_idx = 0; child_idx < num_children; ++child_idx) {
-            if (!check_left_conjuncts(conjunct->get_child(child_idx))) {
-                return false;
-            }
-        }
-        return true;
-    }
-}
-
-bool EsScanNode::ignore_cast(SlotDescriptor* slot, Expr* expr) {
-    if (slot->type().is_date_type() && expr->type().is_date_type()) {
-        return true;
-    }
-    if (slot->type().is_string_type() && expr->type().is_string_type()) {
-        return true;
-    }
-    return false;
-}
-
-bool EsScanNode::get_disjuncts(ExprContext* context, Expr* conjunct,
-                               std::vector<TExtPredicate>& disjuncts) {
-    if (TExprNodeType::BINARY_PRED == conjunct->node_type()) {
-        if (conjunct->children().size() != 2) {
-            VLOG_CRITICAL << "get disjuncts fail: number of children is not 2";
-            return false;
-        }
-        SlotRef* slotRef;
-        TExprOpcode::type op;
-        Expr* expr;
-        if (TExprNodeType::SLOT_REF == conjunct->get_child(0)->node_type()) {
-            expr = conjunct->get_child(1);
-            slotRef = (SlotRef*)(conjunct->get_child(0));
-            op = conjunct->op();
-        } else if (TExprNodeType::SLOT_REF == conjunct->get_child(1)->node_type()) {
-            expr = conjunct->get_child(0);
-            slotRef = (SlotRef*)(conjunct->get_child(1));
-            op = conjunct->op();
-        } else {
-            VLOG_CRITICAL << "get disjuncts fail: no SLOT_REF child";
-            return false;
-        }
-
-        SlotDescriptor* slot_desc = get_slot_desc(slotRef);
-        if (slot_desc == nullptr) {
-            VLOG_CRITICAL << "get disjuncts fail: slot_desc is null";
-            return false;
-        }
-
-        TExtLiteral literal;
-        if (!to_ext_literal(context, expr, &literal)) {
-            VLOG_CRITICAL << "get disjuncts fail: can't get literal, node_type="
-                          << expr->node_type();
-            return false;
-        }
-
-        TExtColumnDesc columnDesc;
-        columnDesc.__set_name(slot_desc->col_name());
-        columnDesc.__set_type(slot_desc->type().to_thrift());
-        TExtBinaryPredicate binaryPredicate;
-        binaryPredicate.__set_col(columnDesc);
-        binaryPredicate.__set_op(op);
-        binaryPredicate.__set_value(std::move(literal));
-        TExtPredicate predicate;
-        predicate.__set_node_type(TExprNodeType::BINARY_PRED);
-        predicate.__set_binary_predicate(binaryPredicate);
-        disjuncts.push_back(std::move(predicate));
-        return true;
-    } else if (is_match_func(conjunct)) {
-        // if this is a function call expr and function name is match, then push
-        // down it to es
-        TExtFunction match_function;
-        match_function.__set_func_name(conjunct->fn().name.function_name);
-        std::vector<TExtLiteral> query_conditions;
-
-        TExtLiteral literal;
-        if (!to_ext_literal(context, conjunct->get_child(1), &literal)) {
-            VLOG_CRITICAL << "get disjuncts fail: can't get literal, node_type="
-                          << conjunct->get_child(1)->node_type();
-            return false;
-        }
-
-        query_conditions.push_back(std::move(literal));
-        match_function.__set_values(query_conditions);
-        TExtPredicate predicate;
-        predicate.__set_node_type(TExprNodeType::FUNCTION_CALL);
-        predicate.__set_ext_function(match_function);
-        disjuncts.push_back(std::move(predicate));
-        return true;
-    } else if (TExprNodeType::IN_PRED == conjunct->node_type()) {
-        // the op code maybe FILTER_NEW_IN, it means there is function in list
-        // like col_a in (abs(1))
-        if (TExprOpcode::FILTER_IN != conjunct->op() &&
-            TExprOpcode::FILTER_NOT_IN != conjunct->op()) {
-            return false;
-        }
-        TExtInPredicate ext_in_predicate;
-        std::vector<TExtLiteral> in_pred_values;
-        InPredicate* pred = static_cast<InPredicate*>(conjunct);
-        ext_in_predicate.__set_is_not_in(pred->is_not_in());
-        if (Expr::type_without_cast(pred->get_child(0)) != TExprNodeType::SLOT_REF) {
-            return false;
-        }
-
-        SlotRef* slot_ref = (SlotRef*)(conjunct->get_child(0));
-        SlotDescriptor* slot_desc = get_slot_desc(slot_ref);
-        if (slot_desc == nullptr) {
-            return false;
-        }
-        TExtColumnDesc columnDesc;
-        columnDesc.__set_name(slot_desc->col_name());
-        columnDesc.__set_type(slot_desc->type().to_thrift());
-        ext_in_predicate.__set_col(columnDesc);
-
-        if (pred->get_child(0)->type().type != slot_desc->type().type) {
-            if (!ignore_cast(slot_desc, pred->get_child(0))) {
-                return false;
-            }
-        }
-
-        HybridSetBase::IteratorBase* iter = pred->hybrid_set()->begin();
-        while (iter->has_next()) {
-            if (nullptr == iter->get_value()) {
-                return false;
-            }
-            TExtLiteral literal;
-            if (!to_ext_literal(slot_desc->type().type, const_cast<void*>(iter->get_value()),
-                                &literal)) {
-                VLOG_CRITICAL << "get disjuncts fail: can't get literal, node_type="
-                              << slot_desc->type().type;
-                return false;
-            }
-            in_pred_values.push_back(literal);
-            iter->next();
-        }
-        ext_in_predicate.__set_values(in_pred_values);
-        TExtPredicate predicate;
-        predicate.__set_node_type(TExprNodeType::IN_PRED);
-        predicate.__set_in_predicate(ext_in_predicate);
-        disjuncts.push_back(std::move(predicate));
-        return true;
-    } else if (TExprNodeType::COMPOUND_PRED == conjunct->node_type()) {
-        if (TExprOpcode::COMPOUND_OR != conjunct->op()) {
-            VLOG_CRITICAL << "get disjuncts fail: op is not COMPOUND_OR";
-            return false;
-        }
-        if (!get_disjuncts(context, conjunct->get_child(0), disjuncts)) {
-            return false;
-        }
-        if (!get_disjuncts(context, conjunct->get_child(1), disjuncts)) {
-            return false;
-        }
-        return true;
-    } else {
-        VLOG_CRITICAL << "get disjuncts fail: node type is " << conjunct->node_type()
-                      << ", should be BINARY_PRED or COMPOUND_PRED";
-        return false;
-    }
-}
-
-bool EsScanNode::is_match_func(Expr* conjunct) {
-    if (TExprNodeType::FUNCTION_CALL == conjunct->node_type() &&
-        conjunct->fn().name.function_name == "esquery") {
-        return true;
-    }
-    return false;
-}
-
-SlotDescriptor* EsScanNode::get_slot_desc(SlotRef* slotRef) {
-    std::vector<SlotId> slot_ids;
-    slotRef->get_slot_ids(&slot_ids);
-    SlotDescriptor* slot_desc = nullptr;
-    for (SlotDescriptor* slot : _tuple_desc->slots()) {
-        if (slot->id() == slot_ids[0]) {
-            slot_desc = slot;
-            break;
-        }
-    }
-    return slot_desc;
-}
-
-bool EsScanNode::to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal) {
-    switch (expr->node_type()) {
-    case TExprNodeType::BOOL_LITERAL:
-    case TExprNodeType::INT_LITERAL:
-    case TExprNodeType::LARGE_INT_LITERAL:
-    case TExprNodeType::FLOAT_LITERAL:
-    case TExprNodeType::DECIMAL_LITERAL:
-    case TExprNodeType::STRING_LITERAL:
-    case TExprNodeType::DATE_LITERAL:
-        return to_ext_literal(expr->type().type, context->get_value(expr, nullptr), literal);
-    default:
-        return false;
-    }
-}
-
-bool EsScanNode::to_ext_literal(PrimitiveType slot_type, void* value, TExtLiteral* literal) {
-    TExprNodeType::type node_type;
-    switch (slot_type) {
-    case TYPE_BOOLEAN: {
-        node_type = (TExprNodeType::BOOL_LITERAL);
-        TBoolLiteral bool_literal;
-        bool_literal.__set_value(*reinterpret_cast<bool*>(value));
-        literal->__set_bool_literal(bool_literal);
-        break;
-    }
-
-    case TYPE_TINYINT: {
-        node_type = (TExprNodeType::INT_LITERAL);
-        TIntLiteral int_literal;
-        int_literal.__set_value(*reinterpret_cast<int8_t*>(value));
-        literal->__set_int_literal(int_literal);
-        break;
-    }
-    case TYPE_SMALLINT: {
-        node_type = (TExprNodeType::INT_LITERAL);
-        TIntLiteral int_literal;
-        int_literal.__set_value(*reinterpret_cast<int16_t*>(value));
-        literal->__set_int_literal(int_literal);
-        break;
-    }
-    case TYPE_INT: {
-        node_type = (TExprNodeType::INT_LITERAL);
-        TIntLiteral int_literal;
-        int_literal.__set_value(*reinterpret_cast<int32_t*>(value));
-        literal->__set_int_literal(int_literal);
-        break;
-    }
-    case TYPE_BIGINT: {
-        node_type = (TExprNodeType::INT_LITERAL);
-        TIntLiteral int_literal;
-        int_literal.__set_value(*reinterpret_cast<int64_t*>(value));
-        literal->__set_int_literal(int_literal);
-        break;
-    }
-
-    case TYPE_LARGEINT: {
-        node_type = (TExprNodeType::LARGE_INT_LITERAL);
-        TLargeIntLiteral large_int_literal;
-        large_int_literal.__set_value(
-                LargeIntValue::to_string(*reinterpret_cast<__int128*>(value)));
-        literal->__set_large_int_literal(large_int_literal);
-        break;
-    }
-
-    case TYPE_FLOAT: {
-        node_type = (TExprNodeType::FLOAT_LITERAL);
-        TFloatLiteral float_literal;
-        float_literal.__set_value(*reinterpret_cast<float*>(value));
-        literal->__set_float_literal(float_literal);
-        break;
-    }
-    case TYPE_DOUBLE: {
-        node_type = (TExprNodeType::FLOAT_LITERAL);
-        TFloatLiteral float_literal;
-        float_literal.__set_value(*reinterpret_cast<double*>(value));
-        literal->__set_float_literal(float_literal);
-        break;
-    }
-
-    case TYPE_DATE:
-    case TYPE_DATETIME: {
-        node_type = (TExprNodeType::DATE_LITERAL);
-        const DateTimeValue date_value = *reinterpret_cast<DateTimeValue*>(value);
-        char str[MAX_DTVALUE_STR_LEN];
-        date_value.to_string(str);
-        TDateLiteral date_literal;
-        date_literal.__set_value(str);
-        literal->__set_date_literal(date_literal);
-        break;
-    }
-
-    case TYPE_CHAR:
-    case TYPE_VARCHAR:
-    case TYPE_STRING: {
-        node_type = (TExprNodeType::STRING_LITERAL);
-        TStringLiteral string_literal;
-        string_literal.__set_value((reinterpret_cast<StringValue*>(value))->debug_string());
-        literal->__set_string_literal(string_literal);
-        break;
-    }
-
-    default: {
-        DCHECK(false) << "Invalid type.";
-        return false;
-    }
-    }
-    literal->__set_node_type(node_type);
-    return true;
-}
-
-Status EsScanNode::get_next_from_es(TExtGetNextResult& result) {
-    TExtGetNextParams params;
-    params.__set_scan_handle(_scan_handles[_scan_range_idx]);
-    params.__set_offset(_offsets[_scan_range_idx]);
-
-    // getNext
-    const TNetworkAddress& address = _addresses[_scan_range_idx];
-#ifndef BE_TEST
-    try {
-        Status create_client_status;
-        ExtDataSourceServiceClientCache* client_cache = _env->extdatasource_client_cache();
-        ExtDataSourceServiceConnection client(client_cache, address, 10000, &create_client_status);
-        if (!create_client_status.ok()) {
-            LOG(WARNING) << "es create client error: scan_range_idx=" << _scan_range_idx
-                         << ", address=" << address
-                         << ", msg=" << create_client_status.get_error_msg();
-            return create_client_status;
-        }
-
-        try {
-            VLOG_CRITICAL << "es get_next param=" << apache::thrift::ThriftDebugString(params);
-            client->getNext(result, params);
-        } catch (apache::thrift::transport::TTransportException& e) {
-            std::stringstream ss;
-            ss << "es get_next error: scan_range_idx=" << _scan_range_idx << ", msg=" << e.what();
-            LOG(WARNING) << ss.str();
-            RETURN_IF_ERROR(client.reopen());
-            return Status::ThriftRpcError(ss.str());
-        }
-    } catch (apache::thrift::TException& e) {
-        std::stringstream ss;
-        ss << "es get_next error: scan_range_idx=" << _scan_range_idx << ", msg=" << e.what();
-        LOG(WARNING) << ss.str();
-        return Status::ThriftRpcError(ss.str());
-    }
-#else
-    TStatus status;
-    result.__set_status(status);
-    result.__set_eos(true);
-    TExtColumnData col_data;
-    std::vector<bool> is_null;
-    is_null.push_back(false);
-    col_data.__set_is_null(is_null);
-    std::vector<int32_t> int_vals;
-    int_vals.push_back(1);
-    int_vals.push_back(2);
-    col_data.__set_int_vals(int_vals);
-    std::vector<TExtColumnData> cols;
-    cols.push_back(col_data);
-    TExtRowBatch rows;
-    rows.__set_cols(cols);
-    rows.__set_num_rows(2);
-    result.__set_rows(rows);
-    return Status(status);
-#endif
-
-    // check result
-    VLOG_CRITICAL << "es get_next result=" << apache::thrift::ThriftDebugString(result);
-    Status get_next_status(result.status);
-    if (!get_next_status.ok()) {
-        LOG(WARNING) << "es get_next error: scan_range_idx=" << _scan_range_idx
-                     << ", address=" << address << ", msg=" << get_next_status.get_error_msg();
-        return get_next_status;
-    }
-    if (!result.__isset.rows || !result.rows.__isset.num_rows) {
-        std::stringstream ss;
-        ss << "es get_next error: scan_range_idx=" << _scan_range_idx
-           << ", msg=rows or num_rows not in result";
-        LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
-    }
-
-    return Status::OK();
-}
-
-Status EsScanNode::materialize_row(MemPool* tuple_pool, Tuple* tuple,
-                                   const std::vector<TExtColumnData>& cols, int row_idx,
-                                   std::vector<int>& cols_next_val_idx) {
-    tuple->init(_tuple_desc->byte_size());
-
-    for (int i = 0; i < _tuple_desc->slots().size(); ++i) {
-        const SlotDescriptor* slot_desc = _tuple_desc->slots()[i];
-
-        if (!slot_desc->is_materialized()) {
-            continue;
-        }
-
-        void* slot = tuple->get_slot(slot_desc->tuple_offset());
-        const TExtColumnData& col = cols[i];
-
-        if (col.is_null[row_idx]) {
-            tuple->set_null(slot_desc->null_indicator_offset());
-            continue;
-        } else {
-            tuple->set_not_null(slot_desc->null_indicator_offset());
-        }
-
-        int val_idx = cols_next_val_idx[i]++;
-        switch (slot_desc->type().type) {
-        case TYPE_CHAR:
-        case TYPE_VARCHAR:
-        case TYPE_STRING: {
-            if (val_idx >= col.string_vals.size()) {
-                return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "STRING"));
-            }
-            const string& val = col.string_vals[val_idx];
-            size_t val_size = val.size();
-            Status rst;
-            char* buffer =
-                    reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size, &rst));
-            if (UNLIKELY(buffer == nullptr)) {
-                std::string details = strings::Substitute(
-                        ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow", val_size, "string slot");
-                RETURN_LIMIT_EXCEEDED(tuple_pool->mem_tracker(), nullptr, details, val_size, rst);
-            }
-            memcpy(buffer, val.data(), val_size);
-            reinterpret_cast<StringValue*>(slot)->ptr = buffer;
-            reinterpret_cast<StringValue*>(slot)->len = val_size;
-            break;
-        }
-        case TYPE_TINYINT:
-            if (val_idx >= col.byte_vals.size()) {
-                return Status::InternalError(
-                        strings::Substitute(ERROR_INVALID_COL_DATA, "TINYINT"));
-            }
-            *reinterpret_cast<int8_t*>(slot) = col.byte_vals[val_idx];
-            break;
-        case TYPE_SMALLINT:
-            if (val_idx >= col.short_vals.size()) {
-                return Status::InternalError(
-                        strings::Substitute(ERROR_INVALID_COL_DATA, "SMALLINT"));
-            }
-            *reinterpret_cast<int16_t*>(slot) = col.short_vals[val_idx];
-            break;
-        case TYPE_INT:
-            if (val_idx >= col.int_vals.size()) {
-                return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "INT"));
-            }
-            *reinterpret_cast<int32_t*>(slot) = col.int_vals[val_idx];
-            break;
-        case TYPE_BIGINT:
-            if (val_idx >= col.long_vals.size()) {
-                return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "BIGINT"));
-            }
-            *reinterpret_cast<int64_t*>(slot) = col.long_vals[val_idx];
-            break;
-        case TYPE_LARGEINT:
-            if (val_idx >= col.long_vals.size()) {
-                return Status::InternalError(
-                        strings::Substitute(ERROR_INVALID_COL_DATA, "LARGEINT"));
-            }
-            *reinterpret_cast<int128_t*>(slot) = col.long_vals[val_idx];
-            break;
-        case TYPE_DOUBLE:
-            if (val_idx >= col.double_vals.size()) {
-                return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "DOUBLE"));
-            }
-            *reinterpret_cast<double*>(slot) = col.double_vals[val_idx];
-            break;
-        case TYPE_FLOAT:
-            if (val_idx >= col.double_vals.size()) {
-                return Status::InternalError(strings::Substitute(ERROR_INVALID_COL_DATA, "FLOAT"));
-            }
-            *reinterpret_cast<float*>(slot) = col.double_vals[val_idx];
-            break;
-        case TYPE_BOOLEAN:
-            if (val_idx >= col.bool_vals.size()) {
-                return Status::InternalError(
-                        strings::Substitute(ERROR_INVALID_COL_DATA, "BOOLEAN"));
-            }
-            *reinterpret_cast<int8_t*>(slot) = col.bool_vals[val_idx];
-            break;
-        case TYPE_DATE:
-            if (val_idx >= col.long_vals.size() ||
-                !reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.long_vals[val_idx],
-                                                                       "+08:00")) {
-                return Status::InternalError(
-                        strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATE"));
-            }
-            reinterpret_cast<DateTimeValue*>(slot)->cast_to_date();
-            break;
-        case TYPE_DATETIME: {
-            if (val_idx >= col.long_vals.size() ||
-                !reinterpret_cast<DateTimeValue*>(slot)->from_unixtime(col.long_vals[val_idx],
-                                                                       "+08:00")) {
-                return Status::InternalError(
-                        strings::Substitute(ERROR_INVALID_COL_DATA, "TYPE_DATETIME"));
-            }
-            reinterpret_cast<DateTimeValue*>(slot)->set_type(TIME_DATETIME);
-            break;
-        }
-        default:
-            DCHECK(false);
-        }
-    }
-    return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/exec/es_scan_node.h b/be/src/exec/es_scan_node.h
deleted file mode 100644
index 173cddaf68..0000000000
--- a/be/src/exec/es_scan_node.h
+++ /dev/null
@@ -1,88 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <memory>
-#include <vector>
-
-#include "exec/scan_node.h"
-#include "exprs/slot_ref.h"
-#include "gen_cpp/PaloExternalDataSourceService_types.h"
-#include "gen_cpp/TExtDataSourceService.h"
-#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
-#include "runtime/tuple.h"
-
-namespace doris {
-
-class TupleDescriptor;
-class RuntimeState;
-class Status;
-
-class EsScanNode : public ScanNode {
-public:
-    EsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-    ~EsScanNode();
-
-    virtual Status prepare(RuntimeState* state) override;
-    virtual Status open(RuntimeState* state) override;
-    virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
-    virtual Status close(RuntimeState* state) override;
-    virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
-
-protected:
-    // Write debug string of this into out.
-    virtual void debug_string(int indentation_level, std::stringstream* out) const override;
-
-private:
-    Status open_es(TNetworkAddress& address, TExtOpenResult& result, TExtOpenParams& params);
-    Status materialize_row(MemPool* tuple_pool, Tuple* tuple, const vector<TExtColumnData>& cols,
-                           int next_row_idx, vector<int>& cols_next_val_idx);
-    Status get_next_from_es(TExtGetNextResult& result);
-
-    bool get_disjuncts(ExprContext* context, Expr* conjunct, vector<TExtPredicate>& disjuncts);
-    bool to_ext_literal(ExprContext* context, Expr* expr, TExtLiteral* literal);
-    bool to_ext_literal(PrimitiveType node_type, void* value, TExtLiteral* literal);
-    bool ignore_cast(SlotDescriptor* slot, Expr* expr);
-
-    bool is_match_func(Expr* conjunct);
-
-    SlotDescriptor* get_slot_desc(SlotRef* slotRef);
-
-    // check if open result meets condition
-    // 1. check if left conjuncts contain "match" function, since match function could only be executed on es
-    bool check_left_conjuncts(Expr* conjunct);
-
-private:
-    TupleId _tuple_id;
-    std::map<std::string, std::string> _properties;
-    const TupleDescriptor* _tuple_desc;
-    ExecEnv* _env;
-    std::vector<TEsScanRange> _scan_ranges;
-
-    // scan range's iterator, used in get_next()
-    int _scan_range_idx;
-
-    // store every scan range's netaddress/handle/offset
-    std::vector<TNetworkAddress> _addresses;
-    std::vector<std::string> _scan_handles;
-    std::vector<int> _offsets;
-    std::vector<ExprContext*> _pushdown_conjunct_ctxs;
-};
-
-} // namespace doris
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 4dbd827605..ccd8e3c854 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -33,7 +33,6 @@
 #include "exec/cross_join_node.h"
 #include "exec/empty_set_node.h"
 #include "exec/es_http_scan_node.h"
-#include "exec/es_scan_node.h"
 #include "exec/except_node.h"
 #include "exec/exchange_node.h"
 #include "exec/hash_join_node.h"
@@ -428,10 +427,6 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
             *node = pool->add(new OdbcScanNode(pool, tnode, descs));
         return Status::OK();
 
-    case TPlanNodeType::ES_SCAN_NODE:
-        *node = pool->add(new EsScanNode(pool, tnode, descs));
-        return Status::OK();
-
     case TPlanNodeType::ES_HTTP_SCAN_NODE:
         if (state->enable_vectorized_exec()) {
             *node = pool->add(new vectorized::VEsHttpScanNode(pool, tnode, descs));
@@ -662,7 +657,6 @@ void ExecNode::collect_nodes(TPlanNodeType::type node_type, std::vector<ExecNode
 void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
     collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes);
     collect_nodes(TPlanNodeType::BROKER_SCAN_NODE, nodes);
-    collect_nodes(TPlanNodeType::ES_SCAN_NODE, nodes);
     collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes);
 }
 
diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h
index 32d408e99d..7be7a8f15f 100644
--- a/be/src/exprs/expr_context.h
+++ b/be/src/exprs/expr_context.h
@@ -160,7 +160,6 @@ private:
     friend class RuntimePredicateWrapper;
     friend class BloomFilterPredicate;
     friend class OlapScanNode;
-    friend class EsScanNode;
     friend class EsPredicate;
 
     /// FunctionContexts for each registered expression. The FunctionContexts are created
diff --git a/be/src/gen_cpp/CMakeLists.txt b/be/src/gen_cpp/CMakeLists.txt
index 22aa8c9cfe..c4b2f0dc14 100644
--- a/be/src/gen_cpp/CMakeLists.txt
+++ b/be/src/gen_cpp/CMakeLists.txt
@@ -37,9 +37,6 @@ set(SRC_FILES
     ${GEN_CPP_DIR}/HeartbeatService_types.cpp
     ${GEN_CPP_DIR}/PaloInternalService_constants.cpp
     ${GEN_CPP_DIR}/PaloInternalService_types.cpp
-    ${GEN_CPP_DIR}/PaloExternalDataSourceService_constants.cpp
-    ${GEN_CPP_DIR}/PaloExternalDataSourceService_types.cpp
-    ${GEN_CPP_DIR}/TExtDataSourceService.cpp
     ${GEN_CPP_DIR}/FrontendService.cpp
     ${GEN_CPP_DIR}/FrontendService_constants.cpp
     ${GEN_CPP_DIR}/FrontendService_types.cpp
diff --git a/be/src/runtime/client_cache.h b/be/src/runtime/client_cache.h
index e29ecc632e..418917ab29 100644
--- a/be/src/runtime/client_cache.h
+++ b/be/src/runtime/client_cache.h
@@ -289,8 +289,5 @@ using FrontendServiceConnection = ClientConnection<FrontendServiceClient>;
 class TPaloBrokerServiceClient;
 using BrokerServiceClientCache = ClientCache<TPaloBrokerServiceClient>;
 using BrokerServiceConnection = ClientConnection<TPaloBrokerServiceClient>;
-class TExtDataSourceServiceClient;
-using ExtDataSourceServiceClientCache = ClientCache<TExtDataSourceServiceClient>;
-using ExtDataSourceServiceConnection = ClientConnection<TExtDataSourceServiceClient>;
 
 } // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4a279d8142..6fda2e6a1f 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -64,7 +64,6 @@ class SmallFileMgr;
 class BackendServiceClient;
 class FrontendServiceClient;
 class TPaloBrokerServiceClient;
-class TExtDataSourceServiceClient;
 class PBackendService_Stub;
 class PFunctionService_Stub;
 
@@ -108,9 +107,6 @@ public:
     ClientCache<BackendServiceClient>* client_cache() { return _backend_client_cache; }
     ClientCache<FrontendServiceClient>* frontend_client_cache() { return _frontend_client_cache; }
     ClientCache<TPaloBrokerServiceClient>* broker_client_cache() { return _broker_client_cache; }
-    ClientCache<TExtDataSourceServiceClient>* extdatasource_client_cache() {
-        return _extdatasource_client_cache;
-    }
 
     // using template to simplify client cache management
     template <typename T>
@@ -184,7 +180,6 @@ private:
     ClientCache<BackendServiceClient>* _backend_client_cache = nullptr;
     ClientCache<FrontendServiceClient>* _frontend_client_cache = nullptr;
     ClientCache<TPaloBrokerServiceClient>* _broker_client_cache = nullptr;
-    ClientCache<TExtDataSourceServiceClient>* _extdatasource_client_cache = nullptr;
     ThreadResourceMgr* _thread_mgr = nullptr;
 
     // The ancestor for all querys tracker.
@@ -248,10 +243,5 @@ inline ClientCache<TPaloBrokerServiceClient>*
 ExecEnv::get_client_cache<TPaloBrokerServiceClient>() {
     return _broker_client_cache;
 }
-template <>
-inline ClientCache<TExtDataSourceServiceClient>*
-ExecEnv::get_client_cache<TExtDataSourceServiceClient>() {
-    return _extdatasource_client_cache;
-}
 
 } // namespace doris
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 0c9a6edc84..ff5b847810 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -20,7 +20,6 @@
 #include "common/logging.h"
 #include "gen_cpp/BackendService.h"
 #include "gen_cpp/HeartbeatService_types.h"
-#include "gen_cpp/TExtDataSourceService.h"
 #include "gen_cpp/TPaloBrokerService.h"
 #include "olap/page_cache.h"
 #include "olap/segment_loader.h"
@@ -95,8 +94,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
     _backend_client_cache = new BackendServiceClientCache(config::max_client_cache_size_per_host);
     _frontend_client_cache = new FrontendServiceClientCache(config::max_client_cache_size_per_host);
     _broker_client_cache = new BrokerServiceClientCache(config::max_client_cache_size_per_host);
-    _extdatasource_client_cache =
-            new ExtDataSourceServiceClientCache(config::max_client_cache_size_per_host);
     _task_pool_mem_tracker_registry.reset(new MemTrackerTaskPool());
     _thread_mgr = new ThreadResourceMgr();
     if (config::doris_enable_scanner_thread_pool_per_disk &&
@@ -148,7 +145,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
     _backend_client_cache->init_metrics("backend");
     _frontend_client_cache->init_metrics("frontend");
     _broker_client_cache->init_metrics("broker");
-    _extdatasource_client_cache->init_metrics("extdatasource");
     _result_mgr->init();
     _cgroups_mgr->init_cgroups();
     _etl_job_mgr->init();
@@ -323,7 +319,6 @@ void ExecEnv::_destroy() {
     SAFE_DELETE(_scan_thread_pool);
     SAFE_DELETE(_thread_mgr);
     SAFE_DELETE(_broker_client_cache);
-    SAFE_DELETE(_extdatasource_client_cache);
     SAFE_DELETE(_frontend_client_cache);
     SAFE_DELETE(_backend_client_cache);
     SAFE_DELETE(_result_mgr);
diff --git a/be/src/util/thrift_rpc_helper.cpp b/be/src/util/thrift_rpc_helper.cpp
index 1083df6bb4..94ad060250 100644
--- a/be/src/util/thrift_rpc_helper.cpp
+++ b/be/src/util/thrift_rpc_helper.cpp
@@ -96,9 +96,4 @@ template Status ThriftRpcHelper::rpc<TPaloBrokerServiceClient>(
         const std::string& ip, const int32_t port,
         std::function<void(ClientConnection<TPaloBrokerServiceClient>&)> callback, int timeout_ms);
 
-template Status ThriftRpcHelper::rpc<TExtDataSourceServiceClient>(
-        const std::string& ip, const int32_t port,
-        std::function<void(ClientConnection<TExtDataSourceServiceClient>&)> callback,
-        int timeout_ms);
-
 } // namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 6b79813060..f39cb68926 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -86,7 +86,6 @@ set(EXEC_TEST_FILES
     # exec/schema_scanner/schema_collations_scanner_test.cpp
     # exec/schema_scanner/schema_charsets_scanner_test.cpp
     # exec/broker_reader_test.cpp
-    # exec/es_scan_node_test.cpp
 )
 
 if(DEFINED DORIS_WITH_LZO)
diff --git a/be/test/exec/es_scan_node_test.cpp b/be/test/exec/es_scan_node_test.cpp
deleted file mode 100644
index c647170f53..0000000000
--- a/be/test/exec/es_scan_node_test.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/es_scan_node.h"
-
-#include <gtest/gtest.h>
-
-#include <string>
-
-#include "common/object_pool.h"
-#include "gen_cpp/PlanNodes_types.h"
-#include "runtime/descriptors.h"
-#include "runtime/mem_pool.h"
-#include "runtime/row_batch.h"
-#include "runtime/runtime_state.h"
-#include "runtime/string_value.h"
-#include "runtime/tuple_row.h"
-#include "util/debug_util.h"
-#include "util/runtime_profile.h"
-
-using std::vector;
-
-namespace doris {
-
-// mock
-class EsScanNodeTest : public testing::Test {
-public:
-    EsScanNodeTest() : _runtime_state(TQueryGlobals()) {
-        _runtime_state._instance_mem_tracker.reset(new MemTracker());
-        TDescriptorTable t_desc_table;
-
-        // table descriptors
-        TTableDescriptor t_table_desc;
-
-        t_table_desc.id = 0;
-        t_table_desc.tableType = TTableType::ES_TABLE;
-        t_table_desc.numCols = 0;
-        t_table_desc.numClusteringCols = 0;
-        t_table_desc.__isset.esTable = true;
-        t_desc_table.tableDescriptors.push_back(t_table_desc);
-        t_desc_table.__isset.tableDescriptors = true;
-        // TSlotDescriptor
-        int offset = 1;
-        int i = 0;
-        // id
-        {
-            TSlotDescriptor t_slot_desc;
-            t_slot_desc.__set_slotType(TypeDescriptor(TYPE_INT).to_thrift());
-            t_slot_desc.__set_columnPos(i);
-            t_slot_desc.__set_byteOffset(offset);
-            t_slot_desc.__set_nullIndicatorByte(0);
-            t_slot_desc.__set_nullIndicatorBit(-1);
-            t_slot_desc.__set_slotIdx(i);
-            t_slot_desc.__set_isMaterialized(true);
-            t_desc_table.slotDescriptors.push_back(t_slot_desc);
-            offset += sizeof(int);
-        }
-
-        TTupleDescriptor t_tuple_desc;
-        t_tuple_desc.id = 0;
-        t_tuple_desc.byteSize = offset;
-        t_tuple_desc.numNullBytes = 1;
-        t_tuple_desc.tableId = 0;
-        t_tuple_desc.__isset.tableId = true;
-        t_desc_table.__isset.slotDescriptors = true;
-        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
-
-        DescriptorTbl::create(&_obj_pool, t_desc_table, &_desc_tbl);
-        _runtime_state.set_desc_tbl(_desc_tbl);
-
-        // Node Id
-        _tnode.node_id = 0;
-        _tnode.node_type = TPlanNodeType::SCHEMA_SCAN_NODE;
-        _tnode.num_children = 0;
-        _tnode.limit = -1;
-        _tnode.row_tuples.push_back(0);
-        _tnode.nullable_tuples.push_back(false);
-        _tnode.es_scan_node.tuple_id = 0;
-        std::map<std::string, std::string> properties;
-        _tnode.es_scan_node.__set_properties(properties);
-        _tnode.__isset.es_scan_node = true;
-    }
-
-protected:
-    virtual void SetUp() {}
-    virtual void TearDown() {}
-    TPlanNode _tnode;
-    ObjectPool _obj_pool;
-    DescriptorTbl* _desc_tbl;
-    RuntimeState _runtime_state;
-};
-
-TEST_F(EsScanNodeTest, normal_use) {
-    EsScanNode scan_node(&_obj_pool, _tnode, *_desc_tbl);
-    Status status = scan_node.prepare(&_runtime_state);
-    EXPECT_TRUE(status.ok());
-    TEsScanRange es_scan_range;
-    es_scan_range.__set_index("index1");
-    es_scan_range.__set_type("docs");
-    es_scan_range.__set_shard_id(0);
-    TNetworkAddress es_host;
-    es_host.__set_hostname("host");
-    es_host.__set_port(8200);
-    std::vector<TNetworkAddress> es_hosts;
-    es_hosts.push_back(es_host);
-    es_scan_range.__set_es_hosts(es_hosts);
-    TScanRange scan_range;
-    scan_range.__set_es_scan_range(es_scan_range);
-    TScanRangeParams scan_range_params;
-    scan_range_params.__set_scan_range(scan_range);
-    std::vector<TScanRangeParams> scan_ranges;
-    scan_ranges.push_back(scan_range_params);
-
-    status = scan_node.set_scan_ranges(scan_ranges);
-    EXPECT_TRUE(status.ok());
-    std::stringstream out;
-    scan_node.debug_string(1, &out);
-    LOG(WARNING) << out.str();
-
-    status = scan_node.open(&_runtime_state);
-    EXPECT_TRUE(status.ok());
-    RowBatch row_batch(scan_node._row_descriptor, _runtime_state.batch_size());
-    bool eos = false;
-    status = scan_node.get_next(&_runtime_state, &row_batch, &eos);
-    EXPECT_TRUE(status.ok());
-    EXPECT_EQ(2, row_batch.num_rows());
-    EXPECT_TRUE(eos);
-
-    status = scan_node.close(&_runtime_state);
-    EXPECT_TRUE(status.ok());
-}
-
-} // namespace doris
diff --git a/gensrc/thrift/PaloExternalDataSourceService.thrift b/gensrc/thrift/PaloExternalDataSourceService.thrift
deleted file mode 100644
index c4cce76d19..0000000000
--- a/gensrc/thrift/PaloExternalDataSourceService.thrift
+++ /dev/null
@@ -1,250 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-namespace java org.apache.doris.thrift
-namespace cpp doris
-
-include "Exprs.thrift"
-include "Opcodes.thrift"
-include "Status.thrift"
-include "Types.thrift"
-
-// A result set column descriptor. 
-// this definition id different from column desc in palo, the column desc in palo only support scalar type, does not support map, array
-// so that should convert palo column desc into ExtColumnDesc
-struct TExtColumnDesc {
-  // The column name as given in the Create .. statement. Always set.
-  1: optional string name
-  // The column type. Always set.
-  2: optional Types.TTypeDesc type
-}
-
-// Metadata used to describe the schema (column names, types, comments)
-// of result sets.
-struct TExtTableSchema {
-  // List of columns. Always set.
-  1: optional list<TExtColumnDesc> cols
-}
-
-struct TExtLiteral {
-  1: required Exprs.TExprNodeType node_type
-  2: optional Exprs.TBoolLiteral bool_literal
-  3: optional Exprs.TDateLiteral date_literal
-  4: optional Exprs.TFloatLiteral float_literal
-  5: optional Exprs.TIntLiteral int_literal
-  6: optional Exprs.TStringLiteral string_literal
-  7: optional Exprs.TDecimalLiteral decimal_literal
-  8: optional Exprs.TLargeIntLiteral large_int_literal
-}
-
-// Binary predicates that can be pushed to the external data source and
-// are of the form <col> <op> <val>. Sources can choose to accept or reject
-// predicates via the return value of prepare(), see TPrepareResult.
-// The column and the value are guaranteed to be type compatible in Impala,
-// but they are not necessarily the same type, so the data source
-// implementation may need to do an implicit cast.
-// > < = != >= <=
-struct TExtBinaryPredicate {
-  // Column on which the predicate is applied. Always set.
-  1: optional TExtColumnDesc col
-  // Comparison operator. Always set.
-  2: optional Opcodes.TExprOpcode op
-  // Value on the right side of the binary predicate. Always set.
-  3: optional TExtLiteral value
-}
-
-struct TExtInPredicate {
-  1: optional bool is_not_in
-  // Column on which the predicate is applied. Always set.
-  2: optional TExtColumnDesc col
-  // Value on the right side of the binary predicate. Always set.
-  3: optional list<TExtLiteral> values
-}
-
-struct TExtLikePredicate {
-  1: optional TExtColumnDesc col
-  2: optional TExtLiteral value
-}
-
-struct TExtIsNullPredicate {
-  1: optional bool is_not_null
-  2: optional TExtColumnDesc col
-}
-
-struct TExtFunction { 
-  1: optional string func_name
-  // input parameter column descs
-  2: optional list<TExtColumnDesc> cols
-  // input parameter column literals
-  3: optional list<TExtLiteral> values
-}
-
-// a union of all predicates
-struct TExtPredicate {
-  1: required Exprs.TExprNodeType node_type
-  2: optional TExtBinaryPredicate binary_predicate
-  3: optional TExtInPredicate in_predicate
-  4: optional TExtLikePredicate like_predicate
-  5: optional TExtIsNullPredicate is_null_predicate
-  6: optional TExtFunction ext_function
-}
-
-// A union over all possible return types for a column of data
-// Currently only used by ExternalDataSource types
-// 
-struct TExtColumnData {
-  // One element in the list for every row in the column indicating if there is
-  // a value in the vals list or a null.
-  1: required list<bool> is_null;
-
-  // Only one is set, only non-null values are set. this indicates one column data for a row batch
-  2: optional list<bool> bool_vals;
-  3: optional binary byte_vals;
-  4: optional list<i16> short_vals;
-  5: optional list<i32> int_vals;
-  6: optional list<i64> long_vals;
-  7: optional list<double> double_vals;
-  8: optional list<string> string_vals;
-  9: optional list<binary> binary_vals;
-}
-
-// Serialized batch of rows returned by getNext().
-// one row batch contains mult rows, and the result is arranged in column style
-struct TExtRowBatch {
-  // Each TColumnData contains the data for an entire column. Always set.
-  1: optional list<TExtColumnData> cols
-
-  // The number of rows returned. For count(*) queries, there may not be
-  // any materialized columns so cols will be an empty list and this value
-  // will indicate how many rows are returned. When there are materialized
-  // columns, this number should be the same as the size of each
-  // TColumnData.is_null list.
-  2: optional i64 num_rows
-}
-
-// Parameters to prepare().
-struct TExtPrepareParams {
-  // The name of the table. Always set.
-  1: optional string table_name
-
-  // A string specified for the table that is passed to the external data source.
-  // Always set, may be an empty string.
-  2: optional string init_string
-
-  // A list of conjunctive (AND) clauses, each of which contains a list of
-  // disjunctive (OR) binary predicates. Always set, may be an empty list.
-  3: optional list<list<TExtPredicate>> predicates
-}
-
-// Returned by prepare().
-struct TExtPrepareResult {
-  1: required Status.TStatus status
-
-  // Estimate of the total number of rows returned when applying the predicates indicated
-  // by accepted_conjuncts. Not set if the data source does not support providing
-  // this statistic.
-  2: optional i64 num_rows_estimate
-
-  // Accepted conjuncts. References the 'predicates' parameter in the prepare()
-  // call. It contains the 0-based indices of the top-level list elements (the
-  // AND elements) that the library will be able to apply during the scan. Those
-  // elements that aren’t referenced in accepted_conjuncts will be evaluated by
-  // Impala itself.
-  3: optional list<i32> accepted_conjuncts
-}
-
-// Parameters to open().
-struct TExtOpenParams {
-  // A unique identifier for the query. Always set.
-  1: optional Types.TUniqueId query_id
-
-  // The name of the table. Always set.
-  2: optional string table_name
-
-  // A string specified for the table that is passed to the external data source.
-  // Always set, may be an empty string.
-  3: optional map<string,string> properties    
-
-  // The authenticated user name. Always set.
-  4: optional string authenticated_user_name
-
-  // The schema of the rows that the scan needs to return. Always set.
-  5: optional TExtTableSchema row_schema
-
-  // The expected size of the row batches it returns in the subsequent getNext() calls.
-  // Always set.
-  6: optional i32 batch_size
-
-  7: optional list<list<TExtPredicate>> predicates
-
-  // The query limit, if specified.
-  8: optional i64 limit
-}
-
-// Returned by open().
-struct TExtOpenResult {
-  1: required Status.TStatus status
-
-  // An opaque handle used in subsequent getNext()/close() calls. Required.
-  2: optional string scan_handle
-  3: optional list<i32> accepted_conjuncts
-}
-
-// Parameters to getNext()
-struct TExtGetNextParams {
-  // The opaque handle returned by the previous open() call. Always set.
-  1: optional string scan_handle    // es search context id
-  2: optional i64 offset            // es should check the offset to prevent duplicate rpc calls
-}
-
-// Returned by getNext().
-struct TExtGetNextResult {
-  1: required Status.TStatus status
-
-  // If true, reached the end of the result stream; subsequent calls to
-  // getNext() won’t return any more results. Required.
-  2: optional bool eos
-
-  // A batch of rows to return, if any exist. The number of rows in the batch
-  // should be less than or equal to the batch_size specified in TOpenParams.
-  3: optional TExtRowBatch rows
-}
-
-// Parameters to close()
-struct TExtCloseParams {
-  // The opaque handle returned by the previous open() call. Always set.
-  1: optional string scan_handle
-}
-
-// Returned by close().
-struct TExtCloseResult {
-  1: required Status.TStatus status
-}
-
-// This data source can be considered as the entry of palo's unified external data source
-service TExtDataSourceService {
-    // 1. palo be call this api to send index, type, shard id to es
-    // 2. es will open a search context and prepare data, will return a context id
-    TExtOpenResult open(1: TExtOpenParams params);
-    // 1. palo be will send a search context id to es 
-    // 2. es will find the search context and find a batch rows and send to palo
-    // 3. palo will run the remaining predicates when receving data
-    // 4. es should check the offset when receive the request
-    TExtGetNextResult getNext(1: TExtGetNextParams params);
-    // 1. es will release the context when receiving the data
-    TExtCloseResult close(1: TExtCloseParams params);
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org