You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2020/04/13 15:07:40 UTC

[incubator-doris] branch master updated: [ES Connector] Add field context for string field keyword type (#3305)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a467c6f  [ES Connector] Add field context for string field keyword type (#3305)
a467c6f is described below

commit a467c6f81f5c7bd81c94401ee75ad84ab1dc4739
Author: Yunfeng,Wu <wu...@baidu.com>
AuthorDate: Mon Apr 13 23:07:33 2020 +0800

    [ES Connector] Add field context for string field keyword type (#3305)
    
    This PR is just a transitional way,but it is better to move the predicates transformation from Doris BE to Doris BE, in this way, Doris BE is responsible for fetching data from ES.
    
     Add a  `enable_keyword_sniff ` configuration item in creating External Elasticsearch Table ,it default to true , would to sniff the `keyword` type on the `text analyzed` Field and return the `json_path` which substitute the origin col name.
    
    ```
    CREATE EXTERNAL TABLE `test` (
      `k1` varchar(20) COMMENT "",
      `create_time` datetime COMMENT ""
    ) ENGINE=ELASTICSEARCH
    PROPERTIES (
    "hosts" = "http://10.74.167.16:8200",
    "user" = "root",
    "password" = "root",
    "index" = "test",
    "type" = "doc",
    "enable_keyword_sniff" = "true"
    );
    ```
    note: `enable_keyword_sniff` default to  "true"
    
    run this SQL:
    
    ```
    select * from test where k1 = "wu yun feng"
    ```
     Output predicate DSL:
    
    ```
    {"term":{"k1.keyword":"wu yun feng"}}
    ```
    and in this PR, I remove the elasticsearch version detected logic for now this is useless, maybe future is needed.
---
 be/src/exec/es/es_predicate.cpp                    | 27 +++++++--
 be/src/exec/es/es_predicate.h                      |  5 ++
 be/src/exec/es_http_scan_node.cpp                  |  5 ++
 be/src/exec/es_http_scan_node.h                    |  1 +
 .../java/org/apache/doris/catalog/Catalog.java     |  5 +-
 .../java/org/apache/doris/catalog/EsTable.java     | 36 +++++++++++-
 .../org/apache/doris/external/EsStateStore.java    | 67 +++++++++++++---------
 .../java/org/apache/doris/planner/EsScanNode.java  |  3 +
 gensrc/thrift/PlanNodes.thrift                     | 12 ++++
 9 files changed, 126 insertions(+), 35 deletions(-)

diff --git a/be/src/exec/es/es_predicate.cpp b/be/src/exec/es/es_predicate.cpp
index 95c5724..c8528c8 100644
--- a/be/src/exec/es/es_predicate.cpp
+++ b/be/src/exec/es/es_predicate.cpp
@@ -253,9 +253,13 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
         }
 
         ExtLiteral literal(expr->type().type, _context->get_value(expr, NULL));
+        std::string col = slot_desc->col_name();
+        if (_field_context.find(col) != _field_context.end()) {
+            col = _field_context[col];
+        }
         ExtPredicate* predicate = new ExtBinaryPredicate(
                     TExprNodeType::BINARY_PRED,
-                    slot_desc->col_name(),
+                    col,
                     slot_desc->type(),
                     op,
                     literal);
@@ -298,8 +302,12 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
         } else {
             is_not_null = true;
         }
+        std::string col = slot_desc->col_name();
+        if (_field_context.find(col) != _field_context.end()) {
+            col = _field_context[col];
+        }
         // use TExprNodeType::IS_NULL_PRED for BooleanQueryBuilder translate
-        ExtIsNullPredicate* predicate = new ExtIsNullPredicate(TExprNodeType::IS_NULL_PRED, slot_desc->col_name(), slot_desc->type(), is_not_null);
+        ExtIsNullPredicate* predicate = new ExtIsNullPredicate(TExprNodeType::IS_NULL_PRED, col, slot_desc->type(), is_not_null);
         _disjuncts.push_back(predicate);
         return Status::OK();
     }
@@ -331,11 +339,14 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
         if (type != TYPE_VARCHAR && type != TYPE_CHAR) {
             return Status::InternalError("build disjuncts failed: like value is not a string");
         }
-
+        std::string col = slot_desc->col_name();
+        if (_field_context.find(col) != _field_context.end()) {
+            col = _field_context[col];
+        }
         ExtLiteral literal(type, _context->get_value(expr, NULL));
         ExtPredicate* predicate = new ExtLikePredicate(
                     TExprNodeType::LIKE_PRED,
-                    slot_desc->col_name(),
+                    col,
                     slot_desc->type(),
                     literal);
 
@@ -380,11 +391,14 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
             in_pred_values.emplace_back(literal);
             iter->next();
         }
-
+        std::string col = slot_desc->col_name();
+        if (_field_context.find(col) != _field_context.end()) {
+            col = _field_context[col];
+        }
         ExtPredicate* predicate = new ExtInPredicate(
                     TExprNodeType::IN_PRED,
                     pred->is_not_in(),
-                    slot_desc->col_name(),
+                    col,
                     slot_desc->type(),
                     in_pred_values);
         _disjuncts.push_back(predicate);
@@ -399,6 +413,7 @@ Status EsPredicate::build_disjuncts_list(const Expr* conjunct) {
             std::vector<EsPredicate*> conjuncts;
             for (int i = 0; i < conjunct->get_num_children(); ++i) {
                 EsPredicate *predicate = _pool->add(new EsPredicate(_context, _tuple_desc, _pool));
+                predicate->set_field_context(_field_context);
                 Status status = predicate->build_disjuncts_list(conjunct->children()[i]);
                 if (status.ok()) {
                     conjuncts.push_back(predicate);
diff --git a/be/src/exec/es/es_predicate.h b/be/src/exec/es/es_predicate.h
index 6e1958b..3618873 100644
--- a/be/src/exec/es/es_predicate.h
+++ b/be/src/exec/es/es_predicate.h
@@ -198,6 +198,10 @@ public:
         return _es_query_status;
     }
 
+    void set_field_context(const std::map<std::string, std::string>& field_context) {
+        _field_context = field_context;
+    }
+
 private:
     Status build_disjuncts_list(const Expr* conjunct);
     bool is_match_func(const Expr* conjunct);
@@ -209,6 +213,7 @@ private:
     std::vector<ExtPredicate*> _disjuncts;
     Status _es_query_status;
     ObjectPool *_pool;
+    std::map<std::string, std::string> _field_context;
 };
 
 }
diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp
index c6c634b..cb5eeb2 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -59,6 +59,10 @@ Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
     if (tnode.es_scan_node.__isset.docvalue_context) {
         _docvalue_context = tnode.es_scan_node.docvalue_context;
     }
+
+    if (tnode.es_scan_node.__isset.fields_context) {
+        _fields_context = tnode.es_scan_node.fields_context;
+    }
     return Status::OK();
 }
 
@@ -93,6 +97,7 @@ Status EsHttpScanNode::build_conjuncts_list() {
     for (int i = 0; i < _conjunct_ctxs.size(); ++i) {
         EsPredicate* predicate = _pool->add(
                     new EsPredicate(_conjunct_ctxs[i], _tuple_desc, _pool));
+        predicate->set_field_context(_fields_context);
         status = predicate->build_disjuncts_list();
         if (status.ok()) {
             _predicates.push_back(predicate);
diff --git a/be/src/exec/es_http_scan_node.h b/be/src/exec/es_http_scan_node.h
index 9990a76..89cca57 100644
--- a/be/src/exec/es_http_scan_node.h
+++ b/be/src/exec/es_http_scan_node.h
@@ -100,6 +100,7 @@ private:
     std::vector<std::promise<Status>> _scanners_status;
     std::map<std::string, std::string> _properties;
     std::map<std::string, std::string> _docvalue_context;
+    std::map<std::string, std::string> _fields_context;
     std::vector<TScanRangeParams> _scan_ranges;
     std::vector<std::string> _column_names;
 
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index 38cd584..928b30b 100644
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -3997,8 +3997,9 @@ public class Catalog {
             sb.append("\"password\" = \"").append(hidePassword ? "" : esTable.getPasswd()).append("\",\n");
             sb.append("\"index\" = \"").append(esTable.getIndexName()).append("\",\n");
             sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n");
-            sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\"\n");
-            sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\"\n");
+            sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\",\n");
+            sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
+            sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n");
             sb.append(")");
         }
         sb.append(";");
diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java
index 2045d6c..252d962 100644
--- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -56,6 +56,7 @@ public class EsTable extends Table {
     public static final String TRANSPORT_HTTP = "http";
     public static final String TRANSPORT_THRIFT = "thrift";
     public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
+    public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
 
     private String hosts;
     private String[] seeds;
@@ -69,6 +70,7 @@ public class EsTable extends Table {
     private PartitionInfo partitionInfo;
     private EsTableState esTableState;
     private boolean enableDocValueScan = false;
+    private boolean enableKeywordSniff = true;
 
     public EsMajorVersion majorVersion = null;
 
@@ -93,6 +95,8 @@ public class EsTable extends Table {
     // use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
     private Map<String, String> docValueContext = new HashMap<>();
 
+    private Map<String, String> fieldsContext = new HashMap<>();
+
     public EsTable() {
         super(TableType.ELASTICSEARCH);
     }
@@ -104,6 +108,13 @@ public class EsTable extends Table {
         validate(properties);
     }
 
+    public void addFetchField(String originName, String replaceName) {
+        fieldsContext.put(originName, replaceName);
+    }
+
+    public Map<String, String> fieldsContext() {
+        return fieldsContext;
+    }
 
     public void addDocValueField(String name, String fieldsName) {
         docValueContext.put(name, fieldsName);
@@ -117,6 +128,10 @@ public class EsTable extends Table {
         return enableDocValueScan;
     }
 
+    public boolean isKeywordSniffEnable() {
+        return enableKeywordSniff;
+    }
+
 
     private void validate(Map<String, String> properties) throws DdlException {
         if (properties == null) {
@@ -159,7 +174,7 @@ public class EsTable extends Table {
             }
         }
 
-        // Explicit setting for cluster version to avoid detecting version failure
+        // enable doc value scan for Elasticsearch
         if (properties.containsKey(DOC_VALUE_SCAN)) {
             try {
                 enableDocValueScan = Boolean.parseBoolean(properties.get(DOC_VALUE_SCAN).trim());
@@ -172,6 +187,18 @@ public class EsTable extends Table {
             enableDocValueScan = false;
         }
 
+        if (properties.containsKey(KEYWORD_SNIFF)) {
+            try {
+                enableKeywordSniff = Boolean.parseBoolean(properties.get(KEYWORD_SNIFF).trim());
+            } catch (Exception e) {
+                throw new DdlException("fail to parse enable_keyword_sniff, enable_keyword_sniff= "
+                        + properties.get(VERSION).trim() + " ,`enable_keyword_sniff`"
+                        + " shoud be like 'true' or 'false', value should be double quotation marks");
+            }
+        } else {
+            enableKeywordSniff = true;
+        }
+
         if (!Strings.isNullOrEmpty(properties.get(TYPE))
                 && !Strings.isNullOrEmpty(properties.get(TYPE).trim())) {
             mappingType = properties.get(TYPE).trim();
@@ -194,6 +221,7 @@ public class EsTable extends Table {
             tableContext.put("majorVersion", majorVersion.toString());
         }
         tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
+        tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
     }
 
     public TTableDescriptor toThrift() {
@@ -278,6 +306,11 @@ public class EsTable extends Table {
             }
 
             enableDocValueScan = Boolean.parseBoolean(tableContext.get("enableDocValueScan"));
+            if (tableContext.containsKey("enableKeywordSniff")) {
+                enableKeywordSniff = Boolean.parseBoolean(tableContext.get("enableKeywordSniff"));
+            } else {
+                enableKeywordSniff = true;
+            }
 
             PartitionType partType = PartitionType.valueOf(Text.readString(in));
             if (partType == PartitionType.UNPARTITIONED) {
@@ -311,6 +344,7 @@ public class EsTable extends Table {
             tableContext.put("mappingType", mappingType);
             tableContext.put("transport", transport);
             tableContext.put("enableDocValueScan", "false");
+            tableContext.put(KEYWORD_SNIFF, "true");
         }
     }
 
diff --git a/fe/src/main/java/org/apache/doris/external/EsStateStore.java b/fe/src/main/java/org/apache/doris/external/EsStateStore.java
index c121c48..0ac05f2 100644
--- a/fe/src/main/java/org/apache/doris/external/EsStateStore.java
+++ b/fe/src/main/java/org/apache/doris/external/EsStateStore.java
@@ -90,9 +90,6 @@ public class EsStateStore extends MasterDaemon {
                         esTable.getUserName(), esTable.getPasswd());
                 // if user not specify the es version, try to get the remote cluster versoin
                 // in the future, we maybe need this version
-                if (esTable.majorVersion == null) {
-                    esTable.majorVersion = client.version();
-                }
                 String indexMetaData = client.getIndexMetaData(esTable.getIndexName());
                 if (indexMetaData == null) {
                     continue;
@@ -197,7 +194,7 @@ public class EsStateStore extends MasterDaemon {
         // {"city": "city.raw"}
         JSONObject indicesMetaMap = jsonObject.getJSONObject("metadata").getJSONObject("indices");
         JSONObject indexMetaMap = indicesMetaMap.optJSONObject(esTable.getIndexName());
-        if (esTable.isDocValueScanEnable() && indexMetaMap != null) {
+        if (indexMetaMap != null && (esTable.isKeywordSniffEnable() || esTable.isDocValueScanEnable())) {
             JSONObject mappings = indexMetaMap.optJSONObject("mappings");
             JSONObject rootSchema = mappings.optJSONObject(esTable.getMappingType());
             JSONObject schema = rootSchema.optJSONObject("properties");
@@ -209,36 +206,54 @@ public class EsStateStore extends MasterDaemon {
                 }
                 JSONObject fieldObject = schema.optJSONObject(colName);
                 String fieldType = fieldObject.optString("type");
-                if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
-                    JSONObject fieldsObject = fieldObject.optJSONObject("fields");
-                    if (fieldsObject != null) {
-                        for (String key : fieldsObject.keySet()) {
-                            JSONObject innerTypeObject = fieldsObject.optJSONObject(key);
-                            if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.optString("type"))) {
-                                continue;
-                            }
-                            if (innerTypeObject.has("doc_values")) {
-                                boolean docValue = innerTypeObject.getBoolean("doc_values");
-                                if (docValue) {
-                                    esTable.addDocValueField(colName, colName);
+                // string-type field used keyword type to generate predicate
+                if (esTable.isKeywordSniffEnable()) {
+                    // if text field type seen, we should use the `field` keyword type?
+                    if ("text".equals(fieldType)) {
+                        JSONObject fieldsObject = fieldObject.optJSONObject("fields");
+                        if (fieldsObject != null) {
+                            for (String key : fieldsObject.keySet()) {
+                                JSONObject innerTypeObject = fieldsObject.optJSONObject(key);
+                                // just for text type
+                                if ("keyword".equals(innerTypeObject.optString("type"))) {
+                                    esTable.addFetchField(colName, colName + "." + key);
                                 }
-                            } else {
-                                // a : {c : {}} -> a -> a.c
-                                esTable.addDocValueField(colName, colName + "." + key);
                             }
                         }
                     }
-                    // skip this field
-                    continue;
                 }
-                // set doc_value = false manually
-                if (fieldObject.has("doc_values")) {
-                    boolean docValue = fieldObject.optBoolean("doc_values");
-                    if (!docValue) {
+                if (esTable.isDocValueScanEnable()) {
+                    if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
+                        JSONObject fieldsObject = fieldObject.optJSONObject("fields");
+                        if (fieldsObject != null) {
+                            for (String key : fieldsObject.keySet()) {
+                                JSONObject innerTypeObject = fieldsObject.optJSONObject(key);
+                                if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.optString("type"))) {
+                                    continue;
+                                }
+                                if (innerTypeObject.has("doc_values")) {
+                                    boolean docValue = innerTypeObject.getBoolean("doc_values");
+                                    if (docValue) {
+                                        esTable.addDocValueField(colName, colName);
+                                    }
+                                } else {
+                                    // a : {c : {}} -> a -> a.c
+                                    esTable.addDocValueField(colName, colName + "." + key);
+                                }
+                            }
+                        }
+                        // skip this field
                         continue;
                     }
+                    // set doc_value = false manually
+                    if (fieldObject.has("doc_values")) {
+                        boolean docValue = fieldObject.optBoolean("doc_values");
+                        if (!docValue) {
+                            continue;
+                        }
+                    }
+                    esTable.addDocValueField(colName, colName);
                 }
-                esTable.addDocValueField(colName, colName);
             }
         }
 
diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
index 8c0e5bb..10e878d 100644
--- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -123,6 +123,9 @@ public class EsScanNode extends ScanNode {
         if (table.isDocValueScanEnable()) {
             esScanNode.setDocvalue_context(table.docValueContext());
         }
+        if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
+            esScanNode.setFields_context(table.fieldsContext());
+        }
         msg.es_scan_node = esScanNode;
 
     }
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ef51eb9..5f04435 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -222,6 +222,18 @@ struct TEsScanNode {
     // {"city": "city.raw"}
     // use select city from table, if enable the docvalue, we will fetch the `city` field value from `city.raw`
     3: optional map<string, string> docvalue_context
+    // used to indicate which string-type field predicate should used xxx.keyword etc.
+    // "k1": {
+    //    "type": "text",
+    //    "fields": {
+    //        "keyword": {
+    //            "type": "keyword",
+    //            "ignore_above": 256
+    //           }
+    //    }
+    // }
+    // k1 > 'abc' -> k1.keyword > 'abc'
+    4: optional map<string, string> fields_context
 }
 
 struct TMiniLoadEtlFunction {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org