You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/12/07 13:13:54 UTC

[incubator-doris] branch master updated: Push limit to Elasticsearch external table (#2400)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a3f71d  Push limit to Elasticsearch external table (#2400)
5a3f71d is described below

commit 5a3f71dd6bb76611cfed7a556d42d2b5c691a48c
Author: Yunfeng,Wu <wu...@baidu.com>
AuthorDate: Sat Dec 7 21:13:44 2019 +0800

    Push limit to Elasticsearch external table (#2400)
---
 be/src/exec/es/es_scan_reader.cpp   | 48 ++++++++++++++++++++++++++++---------
 be/src/exec/es/es_scan_reader.h     | 21 ++++++++++++++++
 be/src/exec/es/es_scroll_parser.cpp | 10 ++++----
 be/src/exec/es/es_scroll_parser.h   |  2 +-
 be/src/exec/es/es_scroll_query.cpp  |  7 +++++-
 be/src/exec/es_http_scan_node.cpp   |  5 ++++
 6 files changed, 76 insertions(+), 17 deletions(-)

diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp
index ef97652..ff514d9 100644
--- a/be/src/exec/es/es_scan_reader.cpp
+++ b/be/src/exec/es/es_scan_reader.cpp
@@ -33,6 +33,8 @@ const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:";
 const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";
 const std::string REQUEST_SEPARATOR = "/";
 
+const std::string REQUEST_SEARCH_FILTER_PATH = "filter_path=hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields";
+
 ESScanReader::ESScanReader(const std::string& target, const std::map<std::string, std::string>& props) : _scroll_keep_alive(config::es_scroll_keepalive), _http_timeout_ms(config::es_http_timeout_ms) {
     _target = target;
     _index = props.at(KEY_INDEX);
@@ -49,11 +51,22 @@ ESScanReader::ESScanReader(const std::string& target, const std::map<std::string
     if (props.find(KEY_QUERY) != props.end()) {
         _query = props.at(KEY_QUERY);
     }
+
     std::string batch_size_str = props.at(KEY_BATCH_SIZE);
     _batch_size = atoi(batch_size_str.c_str());
-    _init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + _scroll_keep_alive + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH;
-    _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH;
+
+    if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
+        _exactly_once = true;
+        // just send a normal search  against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect
+        _search_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?terminate_after=" + props.at(KEY_TERMINATE_AFTER) + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REQUEST_SEARCH_FILTER_PATH;
+    } else {
+        _exactly_once = false;
+        // scroll request for scanning 
+        _init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + _scroll_keep_alive + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH;
+        _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH;
+    }
     _eos = false;
+    
 }
 
 ESScanReader::~ESScanReader() {
@@ -61,8 +74,13 @@ ESScanReader::~ESScanReader() {
 
 Status ESScanReader::open() {
     _is_first = true;
-    RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
-    LOG(INFO) << "First scroll request URL: " << _init_scroll_url;
+    if (_exactly_once) {
+        RETURN_IF_ERROR(_network_client.init(_search_url));
+        LOG(INFO) << "search request URL: " << _search_url;
+    } else {
+        RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
+        LOG(INFO) << "First scroll request URL: " << _init_scroll_url;
+    }
     _network_client.set_basic_auth(_user_name, _passwd);
     _network_client.set_content_type("application/json");
     // phase open, we cached the first response for `get_next` phase
@@ -89,6 +107,9 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
         response = _cached_response;
         _is_first = false;
     } else {
+        if (_exactly_once) {
+            return Status::OK();
+        }
         RETURN_IF_ERROR(_network_client.init(_next_scroll_url));
         _network_client.set_basic_auth(_user_name, _passwd);
         _network_client.set_content_type("application/json");
@@ -110,21 +131,26 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
     }
 
     scroll_parser.reset(new ScrollParser());
-    Status status = scroll_parser->parse(response);
+    VLOG(1) << "get_next request ES, returned response: " << response;
+    Status status = scroll_parser->parse(response, _exactly_once);
     if (!status.ok()){
         _eos = true;
         LOG(WARNING) << status.get_error_msg();
         return status;
     }
 
-    _scroll_id = scroll_parser->get_scroll_id();
-    if (scroll_parser->get_total() == 0) {
+    // request ES just only once
+    if (_exactly_once) {
         _eos = true;
-        return Status::OK();
-    }
-
-    _eos = scroll_parser->get_size() < _batch_size;
+    } else {
+        _scroll_id = scroll_parser->get_scroll_id();
+        if (scroll_parser->get_total() == 0) {
+            _eos = true;
+            return Status::OK();
+        }
 
+        _eos = scroll_parser->get_size() < _batch_size;
+    }
     *scan_eos = false;
     return Status::OK();
 }
diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h
index edea638..60137c1 100644
--- a/be/src/exec/es/es_scan_reader.h
+++ b/be/src/exec/es/es_scan_reader.h
@@ -39,6 +39,7 @@ public:
     static constexpr const char* KEY_SHARD = "shard_id";
     static constexpr const char* KEY_QUERY = "query";
     static constexpr const char* KEY_BATCH_SIZE = "batch_size";
+    static constexpr const char* KEY_TERMINATE_AFTER = "limit";
     ESScanReader(const std::string& target, const std::map<std::string, std::string>& props);
     ~ESScanReader();
 
@@ -63,8 +64,26 @@ private:
     std::string _shards;
     // distinguish the first scroll phase and the following scroll
     bool _is_first;
+
+    // `_init_scroll_url` and `_next_scroll_url` used for scrolling result from Elasticsearch
+    //In order to use scrolling, the initial search request should specify the scroll parameter in the query string,
+    // which tells Elasticsearch how long it should keep the “search context” alive:
+    // {index}/{type}/_search?scroll=5m
     std::string _init_scroll_url;
+    // The result from the above request includes a _scroll_id, which should be passed to the scroll API in order to retrieve the next batch of results
+    // _next_scroll_url for the subsequent scroll request, like /_search/scroll
+    // POST /_search/scroll 
+    // {
+    //    "scroll" : "1m", 
+    //    "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMNjU1QQ==" 
+    // }
+    // Each call to the scroll API returns the next batch of results until there are no more results left to return
     std::string _next_scroll_url;
+    
+    // _search_url used to exeucte just only one search request to Elasticsearch
+    // _search_url would go into effect when `limit` specified:
+    // select * from es_table limit 10 -> /es_table/doc/_search?terminate_after=10
+    std::string _search_url;
     bool _eos;
     int _batch_size;
 
@@ -73,6 +92,8 @@ private:
     std::string _scroll_keep_alive;
     // timeout for es http connetion
     int _http_timeout_ms;
+
+    bool _exactly_once;
 };
 }
 
diff --git a/be/src/exec/es/es_scroll_parser.cpp b/be/src/exec/es/es_scroll_parser.cpp
index b605be3..c8271da 100644
--- a/be/src/exec/es/es_scroll_parser.cpp
+++ b/be/src/exec/es/es_scroll_parser.cpp
@@ -198,7 +198,7 @@ ScrollParser::ScrollParser() :
 ScrollParser::~ScrollParser() {
 }
 
-Status ScrollParser::parse(const std::string& scroll_result) {
+Status ScrollParser::parse(const std::string& scroll_result, bool exactly_once) {
     _document_node.Parse(scroll_result.c_str());
     if (_document_node.HasParseError()) {
         std::stringstream ss;
@@ -206,13 +206,15 @@ Status ScrollParser::parse(const std::string& scroll_result) {
         return Status::InternalError(ss.str());
     }
 
-    if (!_document_node.HasMember(FIELD_SCROLL_ID)) {
+    if (!exactly_once && !_document_node.HasMember(FIELD_SCROLL_ID)) {
         LOG(WARNING) << "Document has not a scroll id field scroll reponse:" << scroll_result;
         return Status::InternalError("Document has not a scroll id field");
     }
 
-    const rapidjson::Value &scroll_node = _document_node[FIELD_SCROLL_ID];
-    _scroll_id = scroll_node.GetString();
+    if (!exactly_once) {
+        const rapidjson::Value &scroll_node = _document_node[FIELD_SCROLL_ID];
+        _scroll_id = scroll_node.GetString();
+    }
     // { hits: { total : 2, "hits" : [ {}, {}, {} ]}}
     const rapidjson::Value &outer_hits_node = _document_node[FIELD_HITS];
     const rapidjson::Value &field_total = outer_hits_node[FIELD_TOTAL];
diff --git a/be/src/exec/es/es_scroll_parser.h b/be/src/exec/es/es_scroll_parser.h
index 29c5cf3..85ac92b 100644
--- a/be/src/exec/es/es_scroll_parser.h
+++ b/be/src/exec/es/es_scroll_parser.h
@@ -33,7 +33,7 @@ public:
     ScrollParser();
     ~ScrollParser();
 
-    Status parse(const std::string& scroll_result);
+    Status parse(const std::string& scroll_result, bool exactly_once = false);
     Status fill_tuple(const TupleDescriptor* _tuple_desc, Tuple* tuple, 
                 MemPool* mem_pool, bool* line_eof, const std::map<std::string, std::string>& docvalue_context);
 
diff --git a/be/src/exec/es/es_scroll_query.cpp b/be/src/exec/es/es_scroll_query.cpp
index 2cb6fdc..e9b88ee 100644
--- a/be/src/exec/es/es_scroll_query.cpp
+++ b/be/src/exec/es/es_scroll_query.cpp
@@ -107,7 +107,12 @@ std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>
         es_query_dsl.AddMember("_source", source_node, allocator);
     }
 
-    int size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str());
+    int size;
+    if (properties.find(ESScanReader::KEY_TERMINATE_AFTER) != properties.end()) {
+        size = atoi(properties.at(ESScanReader::KEY_TERMINATE_AFTER).c_str());
+    } else {
+        size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str());
+    }
     rapidjson::Value sort_node(rapidjson::kArrayType);
     // use the scroll-scan mode for scan index documents
     rapidjson::Value field("_doc", allocator);
diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp
index 1f4fcc2..bf1121f 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -429,9 +429,14 @@ void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Stat
     properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range.shard_id);
     properties[ESScanReader::KEY_BATCH_SIZE] = std::to_string(_runtime_state->batch_size());
     properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range.es_hosts);
+    // push down limit to Elasticsearch
+    if (limit() != -1 && limit() <= _runtime_state->batch_size()) {
+        properties[ESScanReader::KEY_TERMINATE_AFTER] = std::to_string(limit());
+    }
     properties[ESScanReader::KEY_QUERY] 
         = ESScrollQueryBuilder::build(properties, _column_names, _predicates, _docvalue_context);
 
+
     // start scanner to scan
     std::unique_ptr<EsHttpScanner> scanner(new EsHttpScanner(
                     _runtime_state, runtime_profile(), _tuple_id,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org