You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2020/04/13 15:07:40 UTC
[incubator-doris] branch master updated: [ES Connector] Add field
context for string field keyword type (#3305)
This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new a467c6f [ES Connector] Add field context for string field keyword type (#3305)
a467c6f is described below
commit a467c6f81f5c7bd81c94401ee75ad84ab1dc4739
Author: Yunfeng,Wu <wu...@baidu.com>
AuthorDate: Mon Apr 13 23:07:33 2020 +0800
[ES Connector] Add field context for string field keyword type (#3305)
This PR is just a transitional way,but it is better to move the predicates transformation from Doris BE to Doris BE, in this way, Doris BE is responsible for fetching data from ES.
Add a `enable_keyword_sniff ` configuration item in creating External Elasticsearch Table ,it default to true , would to sniff the `keyword` type on the `text analyzed` Field and return the `json_path` which substitute the origin col name.
```
CREATE EXTERNAL TABLE `test` (
`k1` varchar(20) COMMENT "",
`create_time` datetime COMMENT ""
) ENGINE=ELASTICSEARCH
PROPERTIES (
"hosts" = "http://10.74.167.16:8200",
"user" = "root",
"password" = "root",
"index" = "test",
"type" = "doc",
"enable_keyword_sniff" = "true"
);
```
note: `enable_keyword_sniff` default to "true"
run this SQL:
```
select * from test where k1 = "wu yun feng"
```
Output predicate DSL:
```
{"term":{"k1.keyword":"wu yun feng"}}
```
and in this PR, I remove the elasticsearch version detected logic for now this is useless, maybe future is needed.
---
be/src/exec/es/es_predicate.cpp | 27 +++++++--
be/src/exec/es/es_predicate.h | 5 ++
be/src/exec/es_http_scan_node.cpp | 5 ++
be/src/exec/es_http_scan_node.h | 1 +
.../java/org/apache/doris/catalog/Catalog.java | 5 +-
.../java/org/apache/doris/catalog/EsTable.java | 36 +++++++++++-
.../org/apache/doris/external/EsStateStore.java | 67 +++++++++++++---------
.../java/org/apache/doris/planner/EsScanNode.java | 3 +
gensrc/thrift/PlanNodes.thrift | 12 ++++
9 files changed, 126 insertions(+), 35 deletions(-)
diff --git a/be/src/exec/es/es_predicate.cpp b/be/src/exec/es/es_predicate.cpp
index 95c5724..c8528c8 100644
--- a/be/src/exec/es/es_predicate.cpp
+++ b/be/src/exec/es/es_predicate.cpp
@@ -253,9 +253,13 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
}
ExtLiteral literal(expr->type().type, _context->get_value(expr, NULL));
+ std::string col = slot_desc->col_name();
+ if (_field_context.find(col) != _field_context.end()) {
+ col = _field_context[col];
+ }
ExtPredicate* predicate = new ExtBinaryPredicate(
TExprNodeType::BINARY_PRED,
- slot_desc->col_name(),
+ col,
slot_desc->type(),
op,
literal);
@@ -298,8 +302,12 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
} else {
is_not_null = true;
}
+ std::string col = slot_desc->col_name();
+ if (_field_context.find(col) != _field_context.end()) {
+ col = _field_context[col];
+ }
// use TExprNodeType::IS_NULL_PRED for BooleanQueryBuilder translate
- ExtIsNullPredicate* predicate = new ExtIsNullPredicate(TExprNodeType::IS_NULL_PRED, slot_desc->col_name(), slot_desc->type(), is_not_null);
+ ExtIsNullPredicate* predicate = new ExtIsNullPredicate(TExprNodeType::IS_NULL_PRED, col, slot_desc->type(), is_not_null);
_disjuncts.push_back(predicate);
return Status::OK();
}
@@ -331,11 +339,14 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
if (type != TYPE_VARCHAR && type != TYPE_CHAR) {
return Status::InternalError("build disjuncts failed: like value is not a string");
}
-
+ std::string col = slot_desc->col_name();
+ if (_field_context.find(col) != _field_context.end()) {
+ col = _field_context[col];
+ }
ExtLiteral literal(type, _context->get_value(expr, NULL));
ExtPredicate* predicate = new ExtLikePredicate(
TExprNodeType::LIKE_PRED,
- slot_desc->col_name(),
+ col,
slot_desc->type(),
literal);
@@ -380,11 +391,14 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
in_pred_values.emplace_back(literal);
iter->next();
}
-
+ std::string col = slot_desc->col_name();
+ if (_field_context.find(col) != _field_context.end()) {
+ col = _field_context[col];
+ }
ExtPredicate* predicate = new ExtInPredicate(
TExprNodeType::IN_PRED,
pred->is_not_in(),
- slot_desc->col_name(),
+ col,
slot_desc->type(),
in_pred_values);
_disjuncts.push_back(predicate);
@@ -399,6 +413,7 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
std::vector<EsPredicate*> conjuncts;
for (int i = 0; i < conjunct->get_num_children(); ++i) {
EsPredicate *predicate = _pool->add(new EsPredicate(_context, _tuple_desc, _pool));
+ predicate->set_field_context(_field_context);
Status status = predicate->build_disjuncts_list(conjunct->children()[i]);
if (status.ok()) {
conjuncts.push_back(predicate);
diff --git a/be/src/exec/es/es_predicate.h b/be/src/exec/es/es_predicate.h
index 6e1958b..3618873 100644
--- a/be/src/exec/es/es_predicate.h
+++ b/be/src/exec/es/es_predicate.h
@@ -198,6 +198,10 @@ public:
return _es_query_status;
}
+ void set_field_context(const std::map<std::string, std::string>& field_context) {
+ _field_context = field_context;
+ }
+
private:
Status build_disjuncts_list(const Expr* conjunct);
bool is_match_func(const Expr* conjunct);
@@ -209,6 +213,7 @@ private:
std::vector<ExtPredicate*> _disjuncts;
Status _es_query_status;
ObjectPool *_pool;
+ std::map<std::string, std::string> _field_context;
};
}
diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp
index c6c634b..cb5eeb2 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -59,6 +59,10 @@ Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
if (tnode.es_scan_node.__isset.docvalue_context) {
_docvalue_context = tnode.es_scan_node.docvalue_context;
}
+
+ if (tnode.es_scan_node.__isset.fields_context) {
+ _fields_context = tnode.es_scan_node.fields_context;
+ }
return Status::OK();
}
@@ -93,6 +97,7 @@ Status EsHttpScanNode::build_conjuncts_list() {
for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
EsPredicate* predicate = _pool->add(
new EsPredicate(_conjunct_ctxs[i], _tuple_desc, _pool));
+ predicate->set_field_context(_fields_context);
status = predicate->build_disjuncts_list();
if (status.ok()) {
_predicates.push_back(predicate);
diff --git a/be/src/exec/es_http_scan_node.h b/be/src/exec/es_http_scan_node.h
index 9990a76..89cca57 100644
--- a/be/src/exec/es_http_scan_node.h
+++ b/be/src/exec/es_http_scan_node.h
@@ -100,6 +100,7 @@ private:
std::vector<std::promise<Status>> _scanners_status;
std::map<std::string, std::string> _properties;
std::map<std::string, std::string> _docvalue_context;
+ std::map<std::string, std::string> _fields_context;
std::vector<TScanRangeParams> _scan_ranges;
std::vector<std::string> _column_names;
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 38cd584..928b30b 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -3997,8 +3997,9 @@ public class Catalog {
sb.append("\"password\" = \"").append(hidePassword ? "" : esTable.getPasswd()).append("\",\n");
sb.append("\"index\" = \"").append(esTable.getIndexName()).append("\",\n");
sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n");
- sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\"\n");
- sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\"\n");
+ sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\",\n");
+ sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
+ sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n");
sb.append(")");
}
sb.append(";");
diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java
index 2045d6c..252d962 100644
--- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -56,6 +56,7 @@ public class EsTable extends Table {
public static final String TRANSPORT_HTTP = "http";
public static final String TRANSPORT_THRIFT = "thrift";
public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
+ public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
private String hosts;
private String[] seeds;
@@ -69,6 +70,7 @@ public class EsTable extends Table {
private PartitionInfo partitionInfo;
private EsTableState esTableState;
private boolean enableDocValueScan = false;
+ private boolean enableKeywordSniff = true;
public EsMajorVersion majorVersion = null;
@@ -93,6 +95,8 @@ public class EsTable extends Table {
// use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
private Map<String, String> docValueContext = new HashMap<>();
+ private Map<String, String> fieldsContext = new HashMap<>();
+
public EsTable() {
super(TableType.ELASTICSEARCH);
}
@@ -104,6 +108,13 @@ public class EsTable extends Table {
validate(properties);
}
+ public void addFetchField(String originName, String replaceName) {
+ fieldsContext.put(originName, replaceName);
+ }
+
+ public Map<String, String> fieldsContext() {
+ return fieldsContext;
+ }
public void addDocValueField(String name, String fieldsName) {
docValueContext.put(name, fieldsName);
@@ -117,6 +128,10 @@ public class EsTable extends Table {
return enableDocValueScan;
}
+ public boolean isKeywordSniffEnable() {
+ return enableKeywordSniff;
+ }
+
private void validate(Map<String, String> properties) throws DdlException {
if (properties == null) {
@@ -159,7 +174,7 @@ public class EsTable extends Table {
}
}
- // Explicit setting for cluster version to avoid detecting version failure
+ // enable doc value scan for Elasticsearch
if (properties.containsKey(DOC_VALUE_SCAN)) {
try {
enableDocValueScan = Boolean.parseBoolean(properties.get(DOC_VALUE_SCAN).trim());
@@ -172,6 +187,18 @@ public class EsTable extends Table {
enableDocValueScan = false;
}
+ if (properties.containsKey(KEYWORD_SNIFF)) {
+ try {
+ enableKeywordSniff = Boolean.parseBoolean(properties.get(KEYWORD_SNIFF).trim());
+ } catch (Exception e) {
+ throw new DdlException("fail to parse enable_keyword_sniff, enable_keyword_sniff= "
+ + properties.get(VERSION).trim() + " ,`enable_keyword_sniff`"
+ + " shoud be like 'true' or 'false', value should be double quotation marks");
+ }
+ } else {
+ enableKeywordSniff = true;
+ }
+
if (!Strings.isNullOrEmpty(properties.get(TYPE))
&& !Strings.isNullOrEmpty(properties.get(TYPE).trim())) {
mappingType = properties.get(TYPE).trim();
@@ -194,6 +221,7 @@ public class EsTable extends Table {
tableContext.put("majorVersion", majorVersion.toString());
}
tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
+ tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
}
public TTableDescriptor toThrift() {
@@ -278,6 +306,11 @@ public class EsTable extends Table {
}
enableDocValueScan = Boolean.parseBoolean(tableContext.get("enableDocValueScan"));
+ if (tableContext.containsKey("enableKeywordSniff")) {
+ enableKeywordSniff = Boolean.parseBoolean(tableContext.get("enableKeywordSniff"));
+ } else {
+ enableKeywordSniff = true;
+ }
PartitionType partType = PartitionType.valueOf(Text.readString(in));
if (partType == PartitionType.UNPARTITIONED) {
@@ -311,6 +344,7 @@ public class EsTable extends Table {
tableContext.put("mappingType", mappingType);
tableContext.put("transport", transport);
tableContext.put("enableDocValueScan", "false");
+ tableContext.put(KEYWORD_SNIFF, "true");
}
}
diff --git a/fe/src/main/java/org/apache/doris/external/EsStateStore.java b/fe/src/main/java/org/apache/doris/external/EsStateStore.java
index c121c48..0ac05f2 100644
--- a/fe/src/main/java/org/apache/doris/external/EsStateStore.java
+++ b/fe/src/main/java/org/apache/doris/external/EsStateStore.java
@@ -90,9 +90,6 @@ public class EsStateStore extends MasterDaemon {
esTable.getUserName(), esTable.getPasswd());
// if user not specify the es version, try to get the remote cluster versoin
// in the future, we maybe need this version
- if (esTable.majorVersion == null) {
- esTable.majorVersion = client.version();
- }
String indexMetaData = client.getIndexMetaData(esTable.getIndexName());
if (indexMetaData == null) {
continue;
@@ -197,7 +194,7 @@ public class EsStateStore extends MasterDaemon {
// {"city": "city.raw"}
JSONObject indicesMetaMap = jsonObject.getJSONObject("metadata").getJSONObject("indices");
JSONObject indexMetaMap = indicesMetaMap.optJSONObject(esTable.getIndexName());
- if (esTable.isDocValueScanEnable() && indexMetaMap != null) {
+ if (indexMetaMap != null && (esTable.isKeywordSniffEnable() || esTable.isDocValueScanEnable())) {
JSONObject mappings = indexMetaMap.optJSONObject("mappings");
JSONObject rootSchema = mappings.optJSONObject(esTable.getMappingType());
JSONObject schema = rootSchema.optJSONObject("properties");
@@ -209,36 +206,54 @@ public class EsStateStore extends MasterDaemon {
}
JSONObject fieldObject = schema.optJSONObject(colName);
String fieldType = fieldObject.optString("type");
- if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
- JSONObject fieldsObject = fieldObject.optJSONObject("fields");
- if (fieldsObject != null) {
- for (String key : fieldsObject.keySet()) {
- JSONObject innerTypeObject = fieldsObject.optJSONObject(key);
- if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.optString("type"))) {
- continue;
- }
- if (innerTypeObject.has("doc_values")) {
- boolean docValue = innerTypeObject.getBoolean("doc_values");
- if (docValue) {
- esTable.addDocValueField(colName, colName);
+ // string-type field used keyword type to generate predicate
+ if (esTable.isKeywordSniffEnable()) {
+ // if text field type seen, we should use the `field` keyword type?
+ if ("text".equals(fieldType)) {
+ JSONObject fieldsObject = fieldObject.optJSONObject("fields");
+ if (fieldsObject != null) {
+ for (String key : fieldsObject.keySet()) {
+ JSONObject innerTypeObject = fieldsObject.optJSONObject(key);
+ // just for text type
+ if ("keyword".equals(innerTypeObject.optString("type"))) {
+ esTable.addFetchField(colName, colName + "." + key);
}
- } else {
- // a : {c : {}} -> a -> a.c
- esTable.addDocValueField(colName, colName + "." + key);
}
}
}
- // skip this field
- continue;
}
- // set doc_value = false manually
- if (fieldObject.has("doc_values")) {
- boolean docValue = fieldObject.optBoolean("doc_values");
- if (!docValue) {
+ if (esTable.isDocValueScanEnable()) {
+ if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
+ JSONObject fieldsObject = fieldObject.optJSONObject("fields");
+ if (fieldsObject != null) {
+ for (String key : fieldsObject.keySet()) {
+ JSONObject innerTypeObject = fieldsObject.optJSONObject(key);
+ if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.optString("type"))) {
+ continue;
+ }
+ if (innerTypeObject.has("doc_values")) {
+ boolean docValue = innerTypeObject.getBoolean("doc_values");
+ if (docValue) {
+ esTable.addDocValueField(colName, colName);
+ }
+ } else {
+ // a : {c : {}} -> a -> a.c
+ esTable.addDocValueField(colName, colName + "." + key);
+ }
+ }
+ }
+ // skip this field
continue;
}
+ // set doc_value = false manually
+ if (fieldObject.has("doc_values")) {
+ boolean docValue = fieldObject.optBoolean("doc_values");
+ if (!docValue) {
+ continue;
+ }
+ }
+ esTable.addDocValueField(colName, colName);
}
- esTable.addDocValueField(colName, colName);
}
}
diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
index 8c0e5bb..10e878d 100644
--- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -123,6 +123,9 @@ public class EsScanNode extends ScanNode {
if (table.isDocValueScanEnable()) {
esScanNode.setDocvalue_context(table.docValueContext());
}
+ if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
+ esScanNode.setFields_context(table.fieldsContext());
+ }
msg.es_scan_node = esScanNode;
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ef51eb9..5f04435 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -222,6 +222,18 @@ struct TEsScanNode {
// {"city": "city.raw"}
// use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
3: optional map<string, string> docvalue_context
+ // used to indicate which string-type field predicate should used xxx.keyword etc.
+ // "k1": {
+ // "type": "text",
+ // "fields": {
+ // "keyword": {
+ // "type": "keyword",
+ // "ignore_above": 256
+ // }
+ // }
+ // }
+ // k1 > 'abc' -> k1.keyword > 'abc'
+ 4: optional map<string, string> fields_context
}
struct TMiniLoadEtlFunction {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org