You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/12/07 13:13:54 UTC
[incubator-doris] branch master updated: Push limit to
Elasticsearch external table (#2400)
This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5a3f71d Push limit to Elasticsearch external table (#2400)
5a3f71d is described below
commit 5a3f71dd6bb76611cfed7a556d42d2b5c691a48c
Author: Yunfeng,Wu <wu...@baidu.com>
AuthorDate: Sat Dec 7 21:13:44 2019 +0800
Push limit to Elasticsearch external table (#2400)
---
be/src/exec/es/es_scan_reader.cpp | 48 ++++++++++++++++++++++++++++---------
be/src/exec/es/es_scan_reader.h | 21 ++++++++++++++++
be/src/exec/es/es_scroll_parser.cpp | 10 ++++----
be/src/exec/es/es_scroll_parser.h | 2 +-
be/src/exec/es/es_scroll_query.cpp | 7 +++++-
be/src/exec/es_http_scan_node.cpp | 5 ++++
6 files changed, 76 insertions(+), 17 deletions(-)
diff --git a/be/src/exec/es/es_scan_reader.cpp b/be/src/exec/es/es_scan_reader.cpp
index ef97652..ff514d9 100644
--- a/be/src/exec/es/es_scan_reader.cpp
+++ b/be/src/exec/es/es_scan_reader.cpp
@@ -33,6 +33,8 @@ const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:";
const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll";
const std::string REQUEST_SEPARATOR = "/";
+const std::string REQUEST_SEARCH_FILTER_PATH = "filter_path=hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields";
+
ESScanReader::ESScanReader(const std::string& target, const std::map<std::string, std::string>& props) : _scroll_keep_alive(config::es_scroll_keepalive), _http_timeout_ms(config::es_http_timeout_ms) {
_target = target;
_index = props.at(KEY_INDEX);
@@ -49,11 +51,22 @@ ESScanReader::ESScanReader(const std::string& target, const std::map<std::string
if (props.find(KEY_QUERY) != props.end()) {
_query = props.at(KEY_QUERY);
}
+
std::string batch_size_str = props.at(KEY_BATCH_SIZE);
_batch_size = atoi(batch_size_str.c_str());
- _init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + _scroll_keep_alive + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH;
- _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH;
+
+ if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
+ _exactly_once = true;
+ // just send a normal search against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect
+ _search_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?terminate_after=" + props.at(KEY_TERMINATE_AFTER) + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REQUEST_SEARCH_FILTER_PATH;
+ } else {
+ _exactly_once = false;
+ // scroll request for scanning
+ _init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + _scroll_keep_alive + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH;
+ _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH;
+ }
_eos = false;
+
}
ESScanReader::~ESScanReader() {
@@ -61,8 +74,13 @@ ESScanReader::~ESScanReader() {
Status ESScanReader::open() {
_is_first = true;
- RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
- LOG(INFO) << "First scroll request URL: " << _init_scroll_url;
+ if (_exactly_once) {
+ RETURN_IF_ERROR(_network_client.init(_search_url));
+ LOG(INFO) << "search request URL: " << _search_url;
+ } else {
+ RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
+ LOG(INFO) << "First scroll request URL: " << _init_scroll_url;
+ }
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
// phase open, we cached the first response for `get_next` phase
@@ -89,6 +107,9 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
response = _cached_response;
_is_first = false;
} else {
+ if (_exactly_once) {
+ return Status::OK();
+ }
RETURN_IF_ERROR(_network_client.init(_next_scroll_url));
_network_client.set_basic_auth(_user_name, _passwd);
_network_client.set_content_type("application/json");
@@ -110,21 +131,26 @@ Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scr
}
scroll_parser.reset(new ScrollParser());
- Status status = scroll_parser->parse(response);
+ VLOG(1) << "get_next request ES, returned response: " << response;
+ Status status = scroll_parser->parse(response, _exactly_once);
if (!status.ok()){
_eos = true;
LOG(WARNING) << status.get_error_msg();
return status;
}
- _scroll_id = scroll_parser->get_scroll_id();
- if (scroll_parser->get_total() == 0) {
+ // request ES just only once
+ if (_exactly_once) {
_eos = true;
- return Status::OK();
- }
-
- _eos = scroll_parser->get_size() < _batch_size;
+ } else {
+ _scroll_id = scroll_parser->get_scroll_id();
+ if (scroll_parser->get_total() == 0) {
+ _eos = true;
+ return Status::OK();
+ }
+ _eos = scroll_parser->get_size() < _batch_size;
+ }
*scan_eos = false;
return Status::OK();
}
diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h
index edea638..60137c1 100644
--- a/be/src/exec/es/es_scan_reader.h
+++ b/be/src/exec/es/es_scan_reader.h
@@ -39,6 +39,7 @@ public:
static constexpr const char* KEY_SHARD = "shard_id";
static constexpr const char* KEY_QUERY = "query";
static constexpr const char* KEY_BATCH_SIZE = "batch_size";
+ static constexpr const char* KEY_TERMINATE_AFTER = "limit";
ESScanReader(const std::string& target, const std::map<std::string, std::string>& props);
~ESScanReader();
@@ -63,8 +64,26 @@ private:
std::string _shards;
// distinguish the first scroll phase and the following scroll
bool _is_first;
+
+ // `_init_scroll_url` and `_next_scroll_url` used for scrolling result from Elasticsearch
+ //In order to use scrolling, the initial search request should specify the scroll parameter in the query string,
+ // which tells Elasticsearch how long it should keep the “search context” alive:
+ // {index}/{type}/_search?scroll=5m
std::string _init_scroll_url;
+ // The result from the above request includes a _scroll_id, which should be passed to the scroll API in order to retrieve the next batch of results
+ // _next_scroll_url for the subsequent scroll request, like /_search/scroll
+ // POST /_search/scroll
+ // {
+ // "scroll" : "1m",
+ // "scroll_id" : "DXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAD4WYm9laVYtZndUQlNsdDcwakFMNjU1QQ=="
+ // }
+ // Each call to the scroll API returns the next batch of results until there are no more results left to return
std::string _next_scroll_url;
+
+ // _search_url used to exeucte just only one search request to Elasticsearch
+ // _search_url would go into effect when `limit` specified:
+ // select * from es_table limit 10 -> /es_table/doc/_search?terminate_after=10
+ std::string _search_url;
bool _eos;
int _batch_size;
@@ -73,6 +92,8 @@ private:
std::string _scroll_keep_alive;
// timeout for es http connetion
int _http_timeout_ms;
+
+ bool _exactly_once;
};
}
diff --git a/be/src/exec/es/es_scroll_parser.cpp b/be/src/exec/es/es_scroll_parser.cpp
index b605be3..c8271da 100644
--- a/be/src/exec/es/es_scroll_parser.cpp
+++ b/be/src/exec/es/es_scroll_parser.cpp
@@ -198,7 +198,7 @@ ScrollParser::ScrollParser() :
ScrollParser::~ScrollParser() {
}
-Status ScrollParser::parse(const std::string& scroll_result) {
+Status ScrollParser::parse(const std::string& scroll_result, bool exactly_once) {
_document_node.Parse(scroll_result.c_str());
if (_document_node.HasParseError()) {
std::stringstream ss;
@@ -206,13 +206,15 @@ Status ScrollParser::parse(const std::string& scroll_result) {
return Status::InternalError(ss.str());
}
- if (!_document_node.HasMember(FIELD_SCROLL_ID)) {
+ if (!exactly_once && !_document_node.HasMember(FIELD_SCROLL_ID)) {
LOG(WARNING) << "Document has not a scroll id field scroll reponse:" << scroll_result;
return Status::InternalError("Document has not a scroll id field");
}
- const rapidjson::Value &scroll_node = _document_node[FIELD_SCROLL_ID];
- _scroll_id = scroll_node.GetString();
+ if (!exactly_once) {
+ const rapidjson::Value &scroll_node = _document_node[FIELD_SCROLL_ID];
+ _scroll_id = scroll_node.GetString();
+ }
// { hits: { total : 2, "hits" : [ {}, {}, {} ]}}
const rapidjson::Value &outer_hits_node = _document_node[FIELD_HITS];
const rapidjson::Value &field_total = outer_hits_node[FIELD_TOTAL];
diff --git a/be/src/exec/es/es_scroll_parser.h b/be/src/exec/es/es_scroll_parser.h
index 29c5cf3..85ac92b 100644
--- a/be/src/exec/es/es_scroll_parser.h
+++ b/be/src/exec/es/es_scroll_parser.h
@@ -33,7 +33,7 @@ public:
ScrollParser();
~ScrollParser();
- Status parse(const std::string& scroll_result);
+ Status parse(const std::string& scroll_result, bool exactly_once = false);
Status fill_tuple(const TupleDescriptor* _tuple_desc, Tuple* tuple,
MemPool* mem_pool, bool* line_eof, const std::map<std::string, std::string>& docvalue_context);
diff --git a/be/src/exec/es/es_scroll_query.cpp b/be/src/exec/es/es_scroll_query.cpp
index 2cb6fdc..e9b88ee 100644
--- a/be/src/exec/es/es_scroll_query.cpp
+++ b/be/src/exec/es/es_scroll_query.cpp
@@ -107,7 +107,12 @@ std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>
es_query_dsl.AddMember("_source", source_node, allocator);
}
- int size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str());
+ int size;
+ if (properties.find(ESScanReader::KEY_TERMINATE_AFTER) != properties.end()) {
+ size = atoi(properties.at(ESScanReader::KEY_TERMINATE_AFTER).c_str());
+ } else {
+ size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str());
+ }
rapidjson::Value sort_node(rapidjson::kArrayType);
// use the scroll-scan mode for scan index documents
rapidjson::Value field("_doc", allocator);
diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp
index 1f4fcc2..bf1121f 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -429,9 +429,14 @@ void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Stat
properties[ESScanReader::KEY_SHARD] = std::to_string(es_scan_range.shard_id);
properties[ESScanReader::KEY_BATCH_SIZE] = std::to_string(_runtime_state->batch_size());
properties[ESScanReader::KEY_HOST_PORT] = get_host_port(es_scan_range.es_hosts);
+ // push down limit to Elasticsearch
+ if (limit() != -1 && limit() <= _runtime_state->batch_size()) {
+ properties[ESScanReader::KEY_TERMINATE_AFTER] = std::to_string(limit());
+ }
properties[ESScanReader::KEY_QUERY]
= ESScrollQueryBuilder::build(properties, _column_names, _predicates, _docvalue_context);
+
// start scanner to scan
std::unique_ptr<EsHttpScanner> scanner(new EsHttpScanner(
_runtime_state, runtime_profile(), _tuple_id,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org