You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/07/10 10:37:44 UTC

[incubator-doris] branch master updated: [Doris On ES] Add docvalue limitation for doc_values scan and enable doc_values scan default (#4055)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 265c26f  [Doris On ES] Add docvalue limitation for doc_values scan and enable doc_values scan default (#4055)
265c26f is described below

commit 265c26f67d80ec7074572c33bb3d71a773781bb2
Author: Yunfeng,Wu <wu...@baidu.com>
AuthorDate: Fri Jul 10 18:37:36 2020 +0800

    [Doris On ES] Add docvalue limitation for doc_values scan and enable doc_values scan default (#4055)
---
 be/src/exec/es/es_scan_reader.h                    |  1 +
 be/src/exec/es/es_scroll_query.cpp                 | 20 +++++---
 .../java/org/apache/doris/catalog/Catalog.java     |  1 +
 .../java/org/apache/doris/catalog/EsTable.java     | 52 +++++++++++++++++++--
 .../java/org/apache/doris/planner/EsScanNode.java  | 54 +++++++++++++++++-----
 5 files changed, 105 insertions(+), 23 deletions(-)

diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h
index 3bd64cf..52b936c 100644
--- a/be/src/exec/es/es_scan_reader.h
+++ b/be/src/exec/es/es_scan_reader.h
@@ -40,6 +40,7 @@ public:
     static constexpr const char* KEY_QUERY = "query";
     static constexpr const char* KEY_BATCH_SIZE = "batch_size";
     static constexpr const char* KEY_TERMINATE_AFTER = "limit";
+    static constexpr const char* KEY_DOC_VALUES_MODE = "doc_values_mode";
     ESScanReader(const std::string& target, const std::map<std::string, std::string>& props, bool doc_value_mode);
     ~ESScanReader();
 
diff --git a/be/src/exec/es/es_scroll_query.cpp b/be/src/exec/es/es_scroll_query.cpp
index 0c4f581..90d68f0 100644
--- a/be/src/exec/es/es_scroll_query.cpp
+++ b/be/src/exec/es/es_scroll_query.cpp
@@ -76,14 +76,20 @@ std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>
     // note: add `query` for this value....
     es_query_dsl.AddMember("query", query_node, allocator);
     bool pure_docvalue = true;
-    // check docvalue sacan optimization
-    if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) {
-        pure_docvalue = false;
+
+    // Doris FE already has checked docvalue-scan optimization
+    if (properties.find(ESScanReader::KEY_DOC_VALUES_MODE) != properties.end()) {
+        pure_docvalue = atoi(properties.at(ESScanReader::KEY_DOC_VALUES_MODE).c_str());
     } else {
-        for (auto& select_field : fields) {
-            if (docvalue_context.find(select_field) == docvalue_context.end()) {
-                pure_docvalue = false;
-                break;
+        // check docvalue scan optimization, used for compatibility
+        if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) {
+            pure_docvalue = false;
+        } else {
+            for (auto& select_field : fields) {
+                if (docvalue_context.find(select_field) == docvalue_context.end()) {
+                    pure_docvalue = false;
+                    break;
+                }
             }
         }
     }
diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
index d1a4460..efeda65 100755
--- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -4036,6 +4036,7 @@ public class Catalog {
             sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n");
             sb.append("\"transport\" = \"").append(esTable.getTransport()).append("\",\n");
             sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isDocValueScanEnable()).append("\",\n");
+            sb.append("\"max_docvalue_fields\" = \"").append(esTable.maxDocValueFields()).append("\",\n");
             sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isKeywordSniffEnable()).append("\"\n");
             sb.append(")");
         } else if (table.getType() == TableType.HIVE) {
diff --git a/fe/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/src/main/java/org/apache/doris/catalog/EsTable.java
index c885fb5..19fa6f1 100644
--- a/fe/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -57,28 +57,51 @@ public class EsTable extends Table {
     public static final String TYPE = "type";
     public static final String TRANSPORT = "transport";
     public static final String VERSION = "version";
+    public static final String DOC_VALUES_MODE = "doc_values_mode";
 
     public static final String TRANSPORT_HTTP = "http";
     public static final String TRANSPORT_THRIFT = "thrift";
     public static final String DOC_VALUE_SCAN = "enable_docvalue_scan";
     public static final String KEYWORD_SNIFF = "enable_keyword_sniff";
+    public static final String MAX_DOCVALUE_FIELDS = "max_docvalue_fields";
 
     private String hosts;
     private String[] seeds;
     private String userName = "";
     private String passwd = "";
+    // index name can be specific index、wildcard matched or alias.
     private String indexName;
+
+    // which type used for `indexName`, default to `_doc`
     private String mappingType = "_doc";
     private String transport = "http";
     // only save the partition definition, save the partition key,
     // partition list is got from es cluster dynamically and is saved in esTableState
     private PartitionInfo partitionInfo;
     private EsTablePartitions esTablePartitions;
-    private boolean enableDocValueScan = false;
-    private boolean enableKeywordSniff = true;
 
+    // Whether to enable docvalues scan optimization for fetching fields more fast, default to true
+    private boolean enableDocValueScan = true;
+    // Whether to enable sniffing keyword for filtering more reasonable, default to true
+    private boolean enableKeywordSniff = true;
+    // if the number of fields which value extracted from `doc_value` exceeding this max limitation
+    // would downgrade to extract value from `stored_fields`
+    private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
+
+    // Solr doc_values vs stored_fields performance-smackdown indicate:
+    // It is possible to notice that retrieving an high number of fields leads
+    // to a sensible worsening of performance if DocValues are used.
+    // Instead,  the (almost) surprising thing is that, by returning less than 20 fields,
+    // DocValues performs better than stored fields and the difference gets little as the number of fields returned increases.
+    // Asking for 9 DocValues fields and 1 stored field takes an average query time is 6.86 (more than returning 10 stored fields)
+    // Here we have a slightly conservative value of 20, but at the same time we also provide configurable parameters for expert-using
+    // @see `MAX_DOCVALUE_FIELDS`
+    private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;
+
+    // version would be used to be compatible with different ES Cluster
     public EsMajorVersion majorVersion = null;
 
+    // tableContext is used for being convenient to persist some configuration parameters uniformly
     private Map<String, String> tableContext = new HashMap<>();
 
     // record the latest and recently exception when sync ES table metadata (mapping, shard location)
@@ -104,6 +127,10 @@ public class EsTable extends Table {
         return esMetaStateTracker.searchContext().docValueFieldsContext();
     }
 
+    public int maxDocValueFields() {
+        return maxDocValueFields;
+    }
+
     public boolean isDocValueScanEnable() {
         return enableDocValueScan;
     }
@@ -166,8 +193,6 @@ public class EsTable extends Table {
                         + properties.get(VERSION).trim() + " ,`enable_docvalue_scan`"
                         + " shoud be like 'true' or 'false', value should be double quotation marks");
             }
-        } else {
-            enableDocValueScan = false;
         }
 
         if (properties.containsKey(KEYWORD_SNIFF)) {
@@ -194,6 +219,17 @@ public class EsTable extends Table {
                         + " but value is " + transport);
             }
         }
+
+        if (properties.containsKey(MAX_DOCVALUE_FIELDS)) {
+            try {
+                maxDocValueFields = Integer.parseInt(properties.get(MAX_DOCVALUE_FIELDS).trim());
+                if (maxDocValueFields < 0) {
+                    maxDocValueFields = 0;
+                }
+            } catch (Exception e) {
+                maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
+            }
+        }
         tableContext.put("hosts", hosts);
         tableContext.put("userName", userName);
         tableContext.put("passwd", passwd);
@@ -205,6 +241,7 @@ public class EsTable extends Table {
         }
         tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
         tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
+        tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields));
     }
 
     public TTableDescriptor toThrift() {
@@ -294,6 +331,13 @@ public class EsTable extends Table {
             } else {
                 enableKeywordSniff = true;
             }
+            if (tableContext.containsKey("maxDocValueFields")) {
+                try {
+                    maxDocValueFields = Integer.parseInt(tableContext.get("maxDocValueFields"));
+                } catch (Exception e) {
+                    maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
+                }
+            }
 
             PartitionType partType = PartitionType.valueOf(Text.readString(in));
             if (partType == PartitionType.UNPARTITIONED) {
diff --git a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
index 03cfc70..36984bf 100644
--- a/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/EsScanNode.java
@@ -18,6 +18,7 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.EsTable;
@@ -40,9 +41,6 @@ import org.apache.doris.thrift.TScanRange;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -50,6 +48,9 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -59,7 +60,7 @@ import java.util.Random;
 import java.util.Set;
 
 public class EsScanNode extends ScanNode {
-    
+
     private static final Logger LOG = LogManager.getLogger(EsScanNode.class);
 
     private final Random random = new Random(System.currentTimeMillis());
@@ -80,10 +81,10 @@ public class EsScanNode extends ScanNode {
     @Override
     public void init(Analyzer analyzer) throws UserException {
         super.init(analyzer);
-        
+
         assignBackends();
     }
-    
+
     @Override
     public int getNumInstances() {
         return shardScanRanges.size();
@@ -93,7 +94,7 @@ public class EsScanNode extends ScanNode {
     public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
         return shardScanRanges;
     }
-    
+
     @Override
     public void finalize(Analyzer analyzer) throws UserException {
         if (isFinalized) {
@@ -109,6 +110,34 @@ public class EsScanNode extends ScanNode {
         isFinalized = true;
     }
 
+    /**
+     * return whether can use the doc_values scan
+     * 0 and 1 are returned to facilitate Doris BE processing
+     *
+     * @param desc            the fields needs to read from ES
+     * @param docValueContext the mapping for docvalues fields from origin field to doc_value fields
+     * @return
+     */
+    private int useDocValueScan(TupleDescriptor desc, Map<String, String> docValueContext) {
+        ArrayList<SlotDescriptor> slotDescriptors = desc.getSlots();
+        List<String> selectedFields = new ArrayList<>(slotDescriptors.size());
+        for (SlotDescriptor slotDescriptor : slotDescriptors) {
+            selectedFields.add(slotDescriptor.getColumn().getName());
+        }
+        if (selectedFields.size() > table.maxDocValueFields()) {
+            return 0;
+        }
+        Set<String> docValueFields = docValueContext.keySet();
+        boolean useDocValue = true;
+        for (String selectedField : selectedFields) {
+            if (!docValueFields.contains(selectedField)) {
+                useDocValue = false;
+                break;
+            }
+        }
+        return useDocValue ? 1 : 0;
+    }
+
     @Override
     protected void toThrift(TPlanNode msg) {
         if (EsTable.TRANSPORT_HTTP.equals(table.getTransport())) {
@@ -123,6 +152,7 @@ public class EsScanNode extends ScanNode {
         esScanNode.setProperties(properties);
         if (table.isDocValueScanEnable()) {
             esScanNode.setDocvalue_context(table.docValueContext());
+            properties.put(EsTable.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext())));
         }
         if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
             esScanNode.setFields_context(table.fieldsContext());
@@ -169,9 +199,9 @@ public class EsScanNode extends ScanNode {
             }
         }
         if (LOG.isDebugEnabled()) {
-            LOG.debug("partition prune finished, unpartitioned index [{}], " 
-                    + "partitioned index [{}]", 
-                    String.join(",", unPartitionedIndices), 
+            LOG.debug("partition prune finished, unpartitioned index [{}], "
+                            + "partitioned index [{}]",
+                    String.join(",", unPartitionedIndices),
                     String.join(",", partitionedIndices));
         }
         int beIndex = random.nextInt(backendList.size());
@@ -241,7 +271,7 @@ public class EsScanNode extends ScanNode {
      * if the index name is an alias or index pattern, then the es table is related
      * with one or more indices some indices could be pruned by using partition info
      * in index settings currently only support range partition setting
-     * 
+     *
      * @param partitionInfo
      * @return
      * @throws AnalysisException
@@ -254,7 +284,7 @@ public class EsScanNode extends ScanNode {
         switch (partitionInfo.getType()) {
             case RANGE: {
                 RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
-                    Map<Long, Range<PartitionKey>> keyRangeById = rangePartitionInfo.getIdToRange(false);
+                Map<Long, Range<PartitionKey>> keyRangeById = rangePartitionInfo.getIdToRange(false);
                 partitionPruner = new RangePartitionPruner(keyRangeById, rangePartitionInfo.getPartitionColumns(),
                         columnFilters);
                 return partitionPruner.prune();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org