You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/12 17:03:02 UTC

[doris] 18/33: [Improvement](multi catalog)Move split size config to session variable (#18355)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch doris-for-zhongjin
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 203b310755d922b6bd8737cb03ce183ac0999867
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Wed Apr 5 01:02:47 2023 +0800

    [Improvement](multi catalog)Move split size config to session variable (#18355)
    
    Move split size config to session variable. Before, it was in Config class, user need to restart FE after change it.
---
 .../src/main/java/org/apache/doris/common/Config.java    | 12 ------------
 .../src/main/java/org/apache/doris/planner/Splitter.java |  2 +-
 .../apache/doris/planner/external/FileSplitStrategy.java |  7 ++++---
 .../org/apache/doris/planner/external/HiveSplitter.java  | 11 ++++++-----
 .../apache/doris/planner/external/QueryScanProvider.java | 16 ++--------------
 .../org/apache/doris/planner/external/TVFSplitter.java   |  8 +++++---
 .../main/java/org/apache/doris/qe/SessionVariable.java   | 14 ++++++++++++++
 7 files changed, 32 insertions(+), 38 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c3211a3818..75954dc408 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1736,18 +1736,6 @@ public class Config extends ConfigBase {
     @ConfField(mutable = false, masterOnly = true)
     public static int backend_rpc_timeout_ms = 60000; // 1 min
 
-    @ConfField(mutable = true, masterOnly = false)
-    public static long file_scan_node_split_size = 256 * 1024 * 1024; // 256mb
-
-    @ConfField(mutable = true, masterOnly = false)
-    public static long file_scan_node_split_num = 128;
-
-    // 0 means use the block size in HDFS/S3 as split size.
-    // HDFS block size is 128MB, while S3 block size is 32MB.
-    // 32MB is too small for a S3 file split, so set 128MB as default split size.
-    @ConfField(mutable = true, masterOnly = false)
-    public static long file_split_size = 134217728;
-
     /**
      * If set to TRUE, FE will:
      * 1. divide BE into high load and low load(no mid load) to force triggering tablet scheduling;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
index a0312e0a36..5ad1034e61 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
@@ -23,7 +23,7 @@ import org.apache.doris.common.UserException;
 import java.util.List;
 
 public interface Splitter {
-    static final long DEFAULT_SPLIT_SIZE = 32 * 1024 * 1024; // 32mb
+    static final long DEFAULT_SPLIT_SIZE = 128 * 1024 * 1024; // 128MB
 
     List<Split> getSplits(List<Expr> exprs) throws UserException;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
index 8fd7f2d16a..617fd67e0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
@@ -17,8 +17,9 @@
 
 package org.apache.doris.planner.external;
 
-import org.apache.doris.common.Config;
-
+/**
+ * TODO: This class would be used later for split assignment.
+ */
 public class FileSplitStrategy {
     private long totalSplitSize;
     private int splitNum;
@@ -34,7 +35,7 @@ public class FileSplitStrategy {
     }
 
     public boolean hasNext() {
-        return totalSplitSize >= Config.file_scan_node_split_size || splitNum >= Config.file_scan_node_split_num;
+        return true;
     }
 
     public void next() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
index 9c8dec303b..5a3af95c6d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.HMSExternalTable;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.HMSExternalCatalog;
@@ -34,6 +33,7 @@ import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.Split;
 import org.apache.doris.planner.Splitter;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
@@ -174,13 +174,14 @@ public class HiveSplitter implements Splitter {
             }
             return splits.toArray(new FileSplit[splits.size()]);
         }
-        long splitSize = Config.file_split_size;
-        boolean useDefaultBlockSize = (splitSize <= 0);
+        long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
         while (locatedFileStatusRemoteIterator.hasNext()) {
             LocatedFileStatus status = locatedFileStatusRemoteIterator.next();
-            if (useDefaultBlockSize) {
-                splitSize = status.getBlockSize() > 0 ? status.getBlockSize() : DEFAULT_SPLIT_SIZE;
+            if (splitSize <= 0) {
+                splitSize = status.getBlockSize();
             }
+            // Min split size is DEFAULT_SPLIT_SIZE(128MB).
+            splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE;
             BlockLocation[] blockLocations = status.getBlockLocations();
             long length = status.getLen();
             long bytesRemaining;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index 5e1eee54ff..0fdfe0943f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -104,11 +104,9 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
         } else if (locationType == TFileType.FILE_S3) {
             context.params.setProperties(locationProperties);
         }
-        TScanRangeLocations curLocations = newLocations(context.params, backendPolicy);
-
-        FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
 
         for (Split split : inputSplits) {
+            TScanRangeLocations curLocations = newLocations(context.params, backendPolicy);
             FileSplit fileSplit = (FileSplit) split;
             List<String> pathPartitionKeys = getPathPartitionKeys();
             List<String> partitionValuesFromPath = BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
@@ -124,18 +122,8 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
             LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
                     curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
                     fileSplit.getLength(), Joiner.on("|").join(fileSplit.getHosts()));
-
-            fileSplitStrategy.update(fileSplit);
-            // Add a new location when it's can be split
-            if (fileSplitStrategy.hasNext()) {
-                scanRangeLocations.add(curLocations);
-                curLocations = newLocations(context.params, backendPolicy);
-                fileSplitStrategy.next();
-            }
-            this.inputFileSize += fileSplit.getLength();
-        }
-        if (curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize() > 0) {
             scanRangeLocations.add(curLocations);
+            this.inputFileSize += fileSplit.getLength();
         }
         LOG.debug("create #{} ScanRangeLocations cost: {} ms",
                 scanRangeLocations.size(), (System.currentTimeMillis() - start));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
index 5929d545ec..5119f67f3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
@@ -18,10 +18,10 @@
 package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.Expr;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.Split;
 import org.apache.doris.planner.Splitter;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
 import org.apache.doris.thrift.TBrokerFileStatus;
 
@@ -50,10 +50,12 @@ public class TVFSplitter implements Splitter {
             long fileLength = fileStatus.getSize();
             Path path = new Path(fileStatus.getPath());
             if (fileStatus.isSplitable) {
-                long splitSize = Config.file_split_size;
+                long splitSize = ConnectContext.get().getSessionVariable().getFileSplitSize();
                 if (splitSize <= 0) {
-                    splitSize = fileStatus.getBlockSize() > 0 ? fileStatus.getBlockSize() : DEFAULT_SPLIT_SIZE;
+                    splitSize = fileStatus.getBlockSize();
                 }
+                // Min split size is DEFAULT_SPLIT_SIZE(128MB).
+                splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : DEFAULT_SPLIT_SIZE;
                 addFileSplits(path, fileLength, splitSize, splits);
             } else {
                 Split split = new FileSplit(path, 0, fileLength, fileLength, new String[0]);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index bcd483689b..c8719a4b5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -295,6 +295,9 @@ public class SessionVariable implements Serializable, Writable {
 
     public static final String DRY_RUN_QUERY = "dry_run_query";
 
+    // Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3.
+    public static final String FILE_SPLIT_SIZE = "file_split_size";
+
     public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
             SKIP_DELETE_PREDICATE,
             SKIP_DELETE_BITMAP,
@@ -790,6 +793,9 @@ public class SessionVariable implements Serializable, Writable {
     @VariableMgr.VarAttr(name = DRY_RUN_QUERY, needForward = true)
     public boolean dryRunQuery = false;
 
+    @VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true)
+    public long fileSplitSize = 0;
+
     // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables,
     // not the default value set in the code.
     public void initFuzzyModeVariables() {
@@ -1363,6 +1369,14 @@ public class SessionVariable implements Serializable, Writable {
         return enableCboStatistics;
     }
 
+    public long getFileSplitSize() {
+        return fileSplitSize;
+    }
+
+    public void setFileSplitSize(long fileSplitSize) {
+        this.fileSplitSize = fileSplitSize;
+    }
+
     /**
      * getInsertVisibleTimeoutMs.
      **/


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org