You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/09 04:11:01 UTC

[doris] 25/29: [Fix](external scan node)Use consistent hash to collect BE only when the file cache is enabled. #20560

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 13927140a64c3f1abbc8909c2eae13c615d808c9
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Fri Jun 9 08:43:12 2023 +0800

    [Fix](external scan node)Use consistent hash to collect BE only when the file cache is enabled. #20560
    
    Use consistent hash to collect BE only when the file cache is enabled. And move the consistent BE assign code to FederationBackendPolicy.
    Fix explain split number and file size incorrect bug.
---
 be/src/vec/exec/scan/vfile_scanner.cpp             |  2 +
 be/src/vec/exec/scan/vfile_scanner.h               |  1 +
 .../planner/external/FederationBackendPolicy.java  | 41 +++++++++++-
 .../doris/planner/external/FileQueryScanNode.java  | 26 ++++----
 .../main/java/org/apache/doris/qe/Coordinator.java | 73 ----------------------
 5 files changed, 55 insertions(+), 88 deletions(-)

diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index a539abc9f0..b0faa7f799 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -117,6 +117,7 @@ Status VFileScanner::prepare(
     _convert_to_output_block_timer =
             ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime");
     _empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT);
+    _file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
 
     _file_cache_statistics.reset(new io::FileCacheStatistics());
     _io_ctx.reset(new io::IOContext());
@@ -698,6 +699,7 @@ Status VFileScanner::_get_next_reader() {
             return Status::InternalError("failed to init reader for file {}, err: {}", range.path,
                                          init_status.to_string());
         }
+        COUNTER_UPDATE(_file_counter, 1);
 
         _name_to_col_type.clear();
         _missing_cols.clear();
diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h
index df4204d5ae..cfab3c26ce 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -164,6 +164,7 @@ private:
     RuntimeProfile::Counter* _pre_filter_timer = nullptr;
     RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
     RuntimeProfile::Counter* _empty_file_counter = nullptr;
+    RuntimeProfile::Counter* _file_counter = nullptr;
 
     const std::unordered_map<std::string, int>* _col_name_to_slot_id;
     // single slot filter conjuncts
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index 91300be1ad..daa1b151ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -20,17 +20,25 @@ package org.apache.doris.planner.external;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.ConsistentHash;
 import org.apache.doris.mysql.privilege.UserProperty;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.BeSelectionPolicy;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TScanRangeLocations;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import com.google.common.hash.Funnel;
+import com.google.common.hash.Hashing;
+import com.google.common.hash.PrimitiveSink;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -38,11 +46,16 @@ import java.util.Set;
 public class FederationBackendPolicy {
     private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class);
     private final List<Backend> backends = Lists.newArrayList();
+    private ConsistentHash<TScanRangeLocations, Backend> consistentHash;
 
     private int nextBe = 0;
+    private boolean initialized = false;
 
     public void init() throws UserException {
-        init(Collections.emptyList());
+        if (!initialized) {
+            init(Collections.emptyList());
+            initialized = true;
+        }
     }
 
     public void init(List<String> preLocations) throws UserException {
@@ -74,6 +87,9 @@ public class FederationBackendPolicy {
         if (backends.isEmpty()) {
             throw new UserException("No available backends");
         }
+        int virtualNumber = Math.max(Math.min(512 / backends.size(), 32), 2);
+        consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(),
+                new BackendHash(), backends, virtualNumber);
     }
 
     public Backend getNextBe() {
@@ -82,6 +98,10 @@ public class FederationBackendPolicy {
         return selectedBackend;
     }
 
+    public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) {
+        return consistentHash.getNode(scanRangeLocations);
+    }
+
     public int numBackends() {
         return backends.size();
     }
@@ -89,4 +109,23 @@ public class FederationBackendPolicy {
     public List<Backend> getBackends() {
         return backends;
     }
+
+    private static class BackendHash implements Funnel<Backend> {
+        @Override
+        public void funnel(Backend backend, PrimitiveSink primitiveSink) {
+            primitiveSink.putLong(backend.getId());
+        }
+    }
+
+    private static class ScanRangeHash implements Funnel<TScanRangeLocations> {
+        @Override
+        public void funnel(TScanRangeLocations scanRange, PrimitiveSink primitiveSink) {
+            Preconditions.checkState(scanRange.scan_range.isSetExtScanRange());
+            for (TFileRangeDesc desc : scanRange.scan_range.ext_scan_range.file_scan_range.ranges) {
+                primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8));
+                primitiveSink.putLong(desc.start_offset);
+                primitiveSink.putLong(desc.size);
+            }
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 4c794d083f..d73299d2c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.planner.external.iceberg.IcebergScanNode;
 import org.apache.doris.planner.external.iceberg.IcebergSplit;
 import org.apache.doris.planner.external.paimon.PaimonScanNode;
 import org.apache.doris.planner.external.paimon.PaimonSplit;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
@@ -80,9 +81,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
     protected Map<String, SlotDescriptor> destSlotDescByName;
     protected TFileScanRangeParams params;
 
-    protected int inputSplitNum = 0;
-    protected long inputFileSize = 0;
-
     /**
      * External file scan node for Query hms table
      * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
@@ -220,7 +218,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
     public void createScanRangeLocations() throws UserException {
         long start = System.currentTimeMillis();
         List<Split> inputSplits = getSplits();
-        this.inputSplitNum = inputSplits.size();
+        this.inputSplitsNum = inputSplits.size();
         if (inputSplits.isEmpty()) {
             return;
         }
@@ -268,7 +266,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
                 scanRangeParams = new TFileScanRangeParams(params);
                 scanRangeParams.setCompressType(getFileCompressType(fileSplit));
             }
-            TScanRangeLocations curLocations = newLocations(scanRangeParams, backendPolicy);
+            TScanRangeLocations curLocations = newLocations(scanRangeParams);
 
             // If fileSplit has partition values, use the values collected from hive partitions.
             // Otherwise, use the values in file path.
@@ -290,17 +288,24 @@ public abstract class FileQueryScanNode extends FileScanNode {
             // }
 
             curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+            TScanRangeLocation location = new TScanRangeLocation();
+            // Use consistent hash to assign the same scan range into the same backend among different queries
+            Backend selectedBackend = ConnectContext.get().getSessionVariable().enableFileCache
+                    ? backendPolicy.getNextConsistentBe(curLocations) : backendPolicy.getNextBe();
+            location.setBackendId(selectedBackend.getId());
+            location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort()));
+            curLocations.addToLocations(location);
             LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
                     curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
                     fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts()));
             scanRangeLocations.add(curLocations);
-            this.inputFileSize += fileSplit.getLength();
+            this.totalFileSize += fileSplit.getLength();
         }
         LOG.debug("create #{} ScanRangeLocations cost: {} ms",
                 scanRangeLocations.size(), (System.currentTimeMillis() - start));
     }
 
-    private TScanRangeLocations newLocations(TFileScanRangeParams params, FederationBackendPolicy backendPolicy) {
+    private TScanRangeLocations newLocations(TFileScanRangeParams params) {
         // Generate on file scan range
         TFileScanRange fileScanRange = new TFileScanRange();
         fileScanRange.setParams(params);
@@ -314,13 +319,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
         // Locations
         TScanRangeLocations locations = new TScanRangeLocations();
         locations.setScanRange(scanRange);
-
-        TScanRangeLocation location = new TScanRangeLocation();
-        Backend selectedBackend = backendPolicy.getNextBe();
-        location.setBackendId(selectedBackend.getId());
-        location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort()));
-        locations.addToLocations(location);
-
         return locations;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 91e01e6399..625c923941 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -31,7 +31,6 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.ExecutionProfile;
 import org.apache.doris.common.telemetry.ScopedSpan;
 import org.apache.doris.common.telemetry.Telemetry;
-import org.apache.doris.common.util.ConsistentHash;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ListUtil;
 import org.apache.doris.common.util.RuntimeProfile;
@@ -63,7 +62,6 @@ import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.planner.external.ExternalScanNode;
-import org.apache.doris.planner.external.FileQueryScanNode;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
 import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
@@ -84,7 +82,6 @@ import org.apache.doris.thrift.TEsScanRange;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TExecPlanFragmentParamsList;
 import org.apache.doris.thrift.TExternalScanRange;
-import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TFileScanRange;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloScanRange;
@@ -118,9 +115,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
-import com.google.common.hash.Funnel;
-import com.google.common.hash.Hashing;
-import com.google.common.hash.PrimitiveSink;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.api.trace.SpanKind;
 import io.opentelemetry.context.Context;
@@ -132,10 +126,8 @@ import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 import org.jetbrains.annotations.NotNull;
 
-import java.nio.charset.StandardCharsets;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -275,25 +267,6 @@ public class Coordinator {
     // True if all scan node are ExternalScanNode.
     private boolean isAllExternalScan = true;
 
-    private static class BackendHash implements Funnel<Backend> {
-        @Override
-        public void funnel(Backend backend, PrimitiveSink primitiveSink) {
-            primitiveSink.putLong(backend.getId());
-        }
-    }
-
-    private static class ScanRangeHash implements Funnel<TScanRangeLocations> {
-        @Override
-        public void funnel(TScanRangeLocations scanRange, PrimitiveSink primitiveSink) {
-            Preconditions.checkState(scanRange.scan_range.isSetExtScanRange());
-            for (TFileRangeDesc desc : scanRange.scan_range.ext_scan_range.file_scan_range.ranges) {
-                primitiveSink.putBytes(desc.path.getBytes(StandardCharsets.UTF_8));
-                primitiveSink.putLong(desc.start_offset);
-                primitiveSink.putLong(desc.size);
-            }
-        }
-    }
-
     // Used for query/insert
     public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner,
             StatsErrorEstimator statsErrorEstimator) {
@@ -2074,58 +2047,12 @@ public class Coordinator {
         return location;
     }
 
-    private void computeScanRangeAssignmentByConsistentHash(
-            FileQueryScanNode scanNode,
-            final List<TScanRangeLocations> locations,
-            FragmentScanRangeAssignment assignment,
-            Map<TNetworkAddress, Long> assignedBytesPerHost,
-            Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
-
-        Collection<Backend> aliveBEs = scanNode.getBackendPolicy().getBackends();
-        if (aliveBEs.isEmpty()) {
-            throw new UserException("No available backends");
-        }
-
-        int virtualNumber = Math.max(Math.min(512 / aliveBEs.size(), 32), 2);
-        ConsistentHash<TScanRangeLocations, Backend> consistentHash = new ConsistentHash<>(
-                Hashing.murmur3_128(), new ScanRangeHash(), new BackendHash(), aliveBEs, virtualNumber);
-        for (TScanRangeLocations scanRangeLocations : locations) {
-            TScanRangeLocation minLocation = scanRangeLocations.locations.get(0);
-            Backend backend = consistentHash.getNode(scanRangeLocations);
-            TNetworkAddress execHostPort = new TNetworkAddress(backend.getHost(), backend.getBePort());
-            this.addressToBackendID.put(execHostPort, backend.getId());
-            // Why only increase 1 in other implementations ?
-            if (scanRangeLocations.getScanRange().isSetExtScanRange()) {
-                for (TFileRangeDesc desc : scanRangeLocations.scan_range.ext_scan_range.file_scan_range.ranges) {
-                    assignedBytesPerHost.compute(execHostPort, (k, v) -> (v == null) ? desc.size : desc.size + v);
-                }
-            }
-            // Is replicaNumPerHost useful ?
-            replicaNumPerHost.computeIfPresent(minLocation.server, (k, v) -> v - 1);
-
-            Map<Integer, List<TScanRangeParams>> scanRanges = findOrInsert(assignment, execHostPort, new HashMap<>());
-            List<TScanRangeParams> scanRangeParamsList =
-                    findOrInsert(scanRanges, scanNode.getId().asInt(), new ArrayList<>());
-            TScanRangeParams scanRangeParams = new TScanRangeParams();
-            scanRangeParams.scan_range = scanRangeLocations.scan_range;
-            scanRangeParams.setVolumeId(minLocation.volume_id);
-            scanRangeParamsList.add(scanRangeParams);
-            updateScanRangeNumByScanRange(scanRangeParams);
-        }
-    }
-
     private void computeScanRangeAssignmentByScheduler(
             final ScanNode scanNode,
             final List<TScanRangeLocations> locations,
             FragmentScanRangeAssignment assignment,
             Map<TNetworkAddress, Long> assignedBytesPerHost,
             Map<TNetworkAddress, Long> replicaNumPerHost) throws Exception {
-        if (scanNode instanceof FileQueryScanNode) {
-            // Use consistent hash to assign the same scan range into the same backend among different queries
-            computeScanRangeAssignmentByConsistentHash(
-                    (FileQueryScanNode) scanNode, locations, assignment, assignedBytesPerHost, replicaNumPerHost);
-            return;
-        }
         for (TScanRangeLocations scanRangeLocations : locations) {
             Reference<Long> backendIdRef = new Reference<Long>();
             TScanRangeLocation minLocation = selectBackendsByRoundRobin(scanRangeLocations,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org