You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/16 01:41:49 UTC

[doris] branch master updated: [config](load) enable new load scan node by default (#14808)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e1e5a802b [config](load) enable new load scan node by default (#14808)
0e1e5a802b is described below

commit 0e1e5a802b3777ab5ae948a0b8d70bcc75da8302
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Fri Dec 16 09:41:43 2022 +0800

    [config](load) enable new load scan node by default (#14808)
    
    Set FE `enable_new_load_scan_node` to true by default.
    So that all load tasks(broker load, stream load, routine load, insert into) will use FileScanNode instead of BrokerScanNode
    to read data
    
    1. Support loading parquet file in stream load with new load scan node.
    2. Fix bug that new parquet reader can not read column without logical or converted type.
    3. Change jsonb parser function to "jsonb_parse_error_to_null"
        So that if the input string is not a valid json string, it will return null for jsonb column in load task.
---
 be/src/http/action/stream_load.cpp                 |  1 +
 be/src/util/jsonb_parser.h                         | 25 +++++---
 be/src/vec/exec/format/parquet/schema_desc.cpp     | 14 +++--
 be/src/vec/exec/scan/vfile_scanner.cpp             |  6 +-
 .../org/apache/doris/analysis/DataDescription.java | 27 ++++++--
 .../main/java/org/apache/doris/common/Config.java  |  2 +-
 .../org/apache/doris/load/BrokerFileGroup.java     |  2 +
 .../org/apache/doris/planner/LoadScanNode.java     |  5 +-
 .../apache/doris/planner/StreamLoadPlanner.java    | 14 ++++-
 .../planner/external/ExternalFileScanNode.java     | 21 ++++++-
 .../doris/planner/external/FileGroupInfo.java      | 15 +++++
 .../doris/planner/external/LoadScanProvider.java   |  3 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  3 +-
 .../java/org/apache/doris/task/LoadTaskInfo.java   |  5 ++
 .../java/org/apache/doris/task/StreamLoadTask.java | 11 ++++
 .../doris/planner/StreamLoadPlannerTest.java       | 71 ----------------------
 .../apache/doris/utframe/TestWithFeService.java    |  8 ++-
 gensrc/thrift/FrontendService.thrift               |  1 +
 .../table_valued_function/test_hdfs_tvf.groovy     |  2 -
 .../suites/export_p0/test_outfile_parquet.groovy   |  2 +-
 .../external_catalog_p0/hive/test_hive_orc.groovy  | 27 --------
 .../hive/test_hive_other.groovy                    | 25 --------
 .../hive/test_hive_parquet.groovy                  | 28 ---------
 .../jsonb_p0/test_jsonb_load_and_function.groovy   | 26 +++++++-
 .../test_jsonb_load_unique_key_and_function.groovy |  2 +
 .../load_p0/broker_load/test_array_load.groovy     |  8 ---
 .../load_p0/broker_load/test_broker_load.groovy    | 21 -------
 ...n_column_exclude_schema_without_jsonpath.groovy |  2 -
 .../stream_load/load_json_null_to_nullable.groovy  |  6 --
 .../stream_load/load_json_with_jsonpath.groovy     |  7 ---
 .../load_p0/stream_load/test_hdfs_json_load.groovy |  6 --
 .../load_p0/stream_load/test_json_load.groovy      | 13 ----
 .../stream_load/test_txt_special_delimiter.groovy  |  7 ---
 .../test_streamload_perfomance.groovy              |  2 +-
 .../multi_catalog_query/hive_catalog_orc.groovy    | 25 --------
 .../hive_catalog_parquet.groovy                    | 25 --------
 36 files changed, 164 insertions(+), 304 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp
index 3b2b289146..5097de5c0d 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -408,6 +408,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
         RETURN_IF_ERROR(file_sink->open());
         request.__isset.path = true;
         request.fileType = TFileType::FILE_LOCAL;
+        request.__set_file_size(ctx->body_bytes);
         ctx->body_sink = file_sink;
     }
     if (!http_req->header(HTTP_COLUMNS).empty()) {
diff --git a/be/src/util/jsonb_parser.h b/be/src/util/jsonb_parser.h
index c050fd305c..f4711f9a62 100644
--- a/be/src/util/jsonb_parser.h
+++ b/be/src/util/jsonb_parser.h
@@ -65,6 +65,7 @@
 #include "jsonb_document.h"
 #include "jsonb_error.h"
 #include "jsonb_writer.h"
+#include "string_parser.hpp"
 
 namespace doris {
 
@@ -894,8 +895,12 @@ private:
         }
 
         *pbuf = 0; // set null-terminator
-        int64_t val = strtol(num_buf_, NULL, 10);
-        if (errno == ERANGE) {
+        StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
+        int64_t val =
+                StringParser::string_to_int<int64_t>(num_buf_, pbuf - num_buf_, &parse_result);
+        if (parse_result != StringParser::PARSE_SUCCESS) {
+            VLOG_ROW << "debug string_to_int error for " << num_buf_ << " val=" << val
+                     << " parse_result=" << parse_result;
             err_ = JsonbErrType::E_DECIMAL_OVERFLOW;
             return false;
         }
@@ -950,7 +955,7 @@ private:
         }
 
         *pbuf = 0; // set null-terminator
-        return internConvertBufferToDouble();
+        return internConvertBufferToDouble(num_buf_, pbuf - num_buf_);
     }
 
     // parse the exponent part of a double number
@@ -990,15 +995,17 @@ private:
         }
 
         *pbuf = 0; // set null-terminator
-        return internConvertBufferToDouble();
+        return internConvertBufferToDouble(num_buf_, pbuf - num_buf_);
     }
 
     // call system function to parse double to string
-    bool internConvertBufferToDouble() {
-        double val = strtod(num_buf_, NULL);
-
-        if (errno == ERANGE) {
-            err_ = JsonbErrType::E_DOUBLE_OVERFLOW;
+    bool internConvertBufferToDouble(char* num_buf_, int len) {
+        StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
+        double val = StringParser::string_to_float<double>(num_buf_, len, &parse_result);
+        if (parse_result != StringParser::PARSE_SUCCESS) {
+            VLOG_ROW << "debug string_to_float error for " << num_buf_ << " val=" << val
+                     << " parse_result=" << parse_result;
+            err_ = JsonbErrType::E_DECIMAL_OVERFLOW;
             return false;
         }
 
diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp b/be/src/vec/exec/format/parquet/schema_desc.cpp
index 6d07886e89..2af4d40ea2 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -168,20 +168,24 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph
         switch (physical_schema.type) {
         case tparquet::Type::BOOLEAN:
             type.type = TYPE_BOOLEAN;
-            return type;
+            break;
         case tparquet::Type::INT32:
             type.type = TYPE_INT;
-            return type;
+            break;
         case tparquet::Type::INT64:
         case tparquet::Type::INT96:
             type.type = TYPE_BIGINT;
-            return type;
+            break;
         case tparquet::Type::FLOAT:
             type.type = TYPE_FLOAT;
-            return type;
+            break;
         case tparquet::Type::DOUBLE:
             type.type = TYPE_DOUBLE;
-            return type;
+            break;
+        case tparquet::Type::BYTE_ARRAY:
+        case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
+            type.type = TYPE_STRING;
+            break;
         default:
             break;
         }
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 1ad6928799..fa146c5e28 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -210,7 +210,11 @@ Status VFileScanner::_init_src_block(Block* block) {
             data_type = DataTypeFactory::instance().create_data_type(it->second, true);
         }
         if (data_type == nullptr) {
-            return Status::NotSupported(fmt::format("Not support arrow type:{}", slot->col_name()));
+            return Status::NotSupported(
+                    fmt::format("Not support data type:{} for column: {}",
+                                (it == _name_to_col_type.end() ? slot->type().debug_string()
+                                                               : it->second.debug_string()),
+                                slot->col_name()));
         }
         MutableColumnPtr data_column = data_type->create_column();
         _src_block.insert(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 79a8cf7840..b67574e743 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -222,9 +222,14 @@ public class DataDescription {
     public DataDescription(String tableName, LoadTaskInfo taskInfo) {
         this.tableName = tableName;
         this.partitionNames = taskInfo.getPartitions();
-        // Add a dummy path to just make analyze() happy.
-        // Stream load does not need this field.
-        this.filePaths = Lists.newArrayList("dummy");
+
+        if (!Strings.isNullOrEmpty(taskInfo.getPath())) {
+            this.filePaths = Lists.newArrayList(taskInfo.getPath());
+        } else {
+            // Add a dummy path to just make analyze() happy.
+            this.filePaths = Lists.newArrayList("dummy");
+        }
+
         this.fileFieldNames = taskInfo.getColumnExprDescs().getFileColNames();
         this.columnSeparator = taskInfo.getColumnSeparator();
         this.lineDelimiter = taskInfo.getLineDelimiter();
@@ -259,7 +264,20 @@ public class DataDescription {
                 // the compress type is saved in "compressType"
                 this.fileFormat = "csv";
             } else {
-                this.fileFormat = "json";
+                switch (type) {
+                    case FORMAT_ORC:
+                        this.fileFormat = "orc";
+                        break;
+                    case FORMAT_PARQUET:
+                        this.fileFormat = "parquet";
+                        break;
+                    case FORMAT_JSON:
+                        this.fileFormat = "json";
+                        break;
+                    default:
+                        this.fileFormat = "unknown";
+                        break;
+                }
             }
         }
         // get compress type
@@ -1019,3 +1037,4 @@ public class DataDescription {
         return toSql();
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index e5c6789277..708c1e79d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1749,7 +1749,7 @@ public class Config extends ConfigBase {
      * Temp config, should be removed when new file scan node is ready.
      */
     @ConfField(mutable = true)
-    public static boolean enable_new_load_scan_node = false;
+    public static boolean enable_new_load_scan_node = true;
 
     /**
      * Max data version of backends serialize block.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index a89f48e52b..497f0c2d23 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -155,6 +155,7 @@ public class BrokerFileGroup implements Writable {
         this.deleteCondition = dataDescription.getDeleteCondition();
         this.mergeType = dataDescription.getMergeType();
         this.sequenceCol = dataDescription.getSequenceCol();
+        this.filePaths = dataDescription.getFilePaths();
     }
 
     // NOTE: DBLock will be held
@@ -598,3 +599,4 @@ public class BrokerFileGroup implements Writable {
         return fileGroup;
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
index 0f8eb77ffb..bf4c66dccb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
@@ -207,6 +207,8 @@ public abstract class LoadScanNode extends ScanNode {
                 expr.analyze(analyzer);
             }
 
+            // for jsonb type, use jsonb_parse_xxx to parse src string to jsonb.
+            // and if input string is not a valid json string, return null.
             PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType();
             PrimitiveType srcType = expr.getType().getPrimitiveType();
             if (dstType == PrimitiveType.JSONB
@@ -217,7 +219,7 @@ public abstract class LoadScanNode extends ScanNode {
                 if (destSlotDesc.getIsNullable() || expr.isNullable()) {
                     nullable = "nullable";
                 }
-                String name = "jsonb_parse_" + nullable + "_error_to_invalid";
+                String name = "jsonb_parse_" + nullable + "_error_to_null";
                 expr = new FunctionCallExpr(name, args);
                 expr.analyze(analyzer);
             } else {
@@ -251,3 +253,4 @@ public abstract class LoadScanNode extends ScanNode {
         planNode.setBrokerScanNode(brokerScanNode);
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 2a8b9631d5..6aa9b4c66d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -52,6 +52,7 @@ import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.PaloInternalServiceVersion;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
+import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TLoadErrorHubInfo;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPlanFragmentExecParams;
@@ -180,9 +181,15 @@ public class StreamLoadPlanner {
             fileGroup.parse(db, dataDescription);
             // 2. create dummy file status
             TBrokerFileStatus fileStatus = new TBrokerFileStatus();
-            fileStatus.setPath("");
-            fileStatus.setIsDir(false);
-            fileStatus.setSize(-1); // must set to -1, means stream.
+            if (taskInfo.getFileType() == TFileType.FILE_LOCAL) {
+                fileStatus.setPath(taskInfo.getPath());
+                fileStatus.setIsDir(false);
+                fileStatus.setSize(taskInfo.getFileSize()); // must set to -1, means stream.
+            } else {
+                fileStatus.setPath("");
+                fileStatus.setIsDir(false);
+                fileStatus.setSize(-1); // must set to -1, means stream.
+            }
             fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(),
                     fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType());
             scanNode = fileScanNode;
@@ -324,3 +331,4 @@ public class StreamLoadPlanner {
         return null;
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index 9d77a33278..4117f20539 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -456,7 +456,25 @@ public class ExternalFileScanNode extends ExternalScanNode {
                 expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1));
                 expr.analyze(analyzer);
             }
-            expr = castToSlot(destSlotDesc, expr);
+
+            // for jsonb type, use jsonb_parse_xxx to parse src string to jsonb.
+            // and if input string is not a valid json string, return null.
+            PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType();
+            PrimitiveType srcType = expr.getType().getPrimitiveType();
+            if (dstType == PrimitiveType.JSONB
+                    && (srcType == PrimitiveType.VARCHAR || srcType == PrimitiveType.STRING)) {
+                List<Expr> args = Lists.newArrayList();
+                args.add(expr);
+                String nullable = "notnull";
+                if (destSlotDesc.getIsNullable() || expr.isNullable()) {
+                    nullable = "nullable";
+                }
+                String name = "jsonb_parse_" + nullable + "_error_to_null";
+                expr = new FunctionCallExpr(name, args);
+                expr.analyze(analyzer);
+            } else {
+                expr = castToSlot(destSlotDesc, expr);
+            }
             params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift());
         }
         params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans);
@@ -550,3 +568,4 @@ public class ExternalFileScanNode extends ExternalScanNode {
 }
 
 
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
index eb99c88ed9..f3cb9a19f9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
@@ -44,6 +44,7 @@ import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -140,6 +141,10 @@ public class FileGroupInfo {
         return loadParallelism;
     }
 
+    public TFileType getFileType() {
+        return fileType;
+    }
+
     public String getExplainString(String prefix) {
         StringBuilder sb = new StringBuilder();
         sb.append("file scan\n");
@@ -303,6 +308,15 @@ public class FileGroupInfo {
             rangeDesc.setSize(rangeBytes);
             rangeDesc.setColumnsFromPath(columnsFromPath);
         } else {
+            // for stream load
+            if (getFileType() == TFileType.FILE_LOCAL) {
+                // when loading parquet via stream, there will be a local file saved on BE
+                // so to read it as a local file.
+                Preconditions.checkState(fileGroup.getFilePaths().size() == 1);
+                rangeDesc.setPath(fileGroup.getFilePaths().get(0));
+                rangeDesc.setStartOffset(0);
+                rangeDesc.setSize(fileStatus.size);
+            }
             rangeDesc.setLoadId(loadId);
             rangeDesc.setSize(fileStatus.size);
         }
@@ -310,3 +324,4 @@ public class FileGroupInfo {
     }
 }
 
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 7502fb0749..8f7ea2bb6e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -111,7 +111,7 @@ public class LoadScanProvider implements FileScanProviderIf {
         TFileAttributes fileAttributes = new TFileAttributes();
         setFileAttributes(ctx.fileGroup, fileAttributes);
         params.setFileAttributes(fileAttributes);
-        params.setFileType(fileGroupInfo.getBrokerDesc().getFileType());
+        params.setFileType(fileGroupInfo.getFileType());
         ctx.params = params;
 
         initColumns(ctx, analyzer);
@@ -252,3 +252,4 @@ public class LoadScanProvider implements FileScanProviderIf {
         return fileGroupInfo.getTargetTable();
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 556b95be08..bab10b3814 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -893,7 +893,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         } catch (Throwable e) {
             LOG.warn("catch unknown result.", e);
             status.setStatusCode(TStatusCode.INTERNAL_ERROR);
-            status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+            status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
             return result;
         }
         return result;
@@ -1224,3 +1224,4 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         return result;
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index dd97602003..642597246f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -71,6 +71,10 @@ public interface LoadTaskInfo {
 
     String getPath();
 
+    default long getFileSize() {
+        return 0;
+    }
+
     double getMaxFilterRatio();
 
     ImportColumnDescs getColumnExprDescs();
@@ -118,3 +122,4 @@ public interface LoadTaskInfo {
         }
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 300a93ca19..59393b7d01 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -68,6 +68,7 @@ public class StreamLoadTask implements LoadTaskInfo {
     private Separator lineDelimiter;
     private PartitionNames partitions;
     private String path;
+    private long fileSize = 0;
     private boolean negative;
     private boolean strictMode = false; // default is false
     private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
@@ -159,6 +160,11 @@ public class StreamLoadTask implements LoadTaskInfo {
         return path;
     }
 
+    @Override
+    public long getFileSize() {
+        return fileSize;
+    }
+
     public boolean getNegative() {
         return negative;
     }
@@ -234,6 +240,7 @@ public class StreamLoadTask implements LoadTaskInfo {
         return !Strings.isNullOrEmpty(sequenceCol);
     }
 
+
     @Override
     public String getSequenceCol() {
         return sequenceCol;
@@ -249,6 +256,9 @@ public class StreamLoadTask implements LoadTaskInfo {
                 request.getFileType(), request.getFormatType(),
                 request.getCompressType());
         streamLoadTask.setOptionalFromTSLPutRequest(request);
+        if (request.isSetFileSize()) {
+            streamLoadTask.fileSize = request.getFileSize();
+        }
         return streamLoadTask;
     }
 
@@ -416,3 +426,4 @@ public class StreamLoadTask implements LoadTaskInfo {
         return maxFilterRatio;
     }
 }
+
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java
index d8ddcfe909..870dbbeb55 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadPlannerTest.java
@@ -17,90 +17,19 @@
 
 package org.apache.doris.planner;
 
-import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.CompoundPredicate;
 import org.apache.doris.analysis.ImportColumnsStmt;
 import org.apache.doris.analysis.ImportWhereStmt;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.SqlParserUtils;
-import org.apache.doris.task.StreamLoadTask;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.TStreamLoadPutRequest;
-import org.apache.doris.thrift.TUniqueId;
 
-import com.google.common.collect.Lists;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mocked;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.StringReader;
-import java.util.Arrays;
-import java.util.List;
 
 public class StreamLoadPlannerTest {
-    @Injectable
-    Database db;
-
-    @Injectable
-    OlapTable destTable;
-
-    @Mocked
-    StreamLoadScanNode scanNode;
-
-    @Mocked
-    OlapTableSink sink;
-
-    @Mocked
-    Partition partition;
-
-    @Test
-    public void testNormalPlan() throws UserException {
-        List<Column> columns = Lists.newArrayList();
-        Column c1 = new Column("c1", PrimitiveType.BIGINT, false);
-        columns.add(c1);
-        Column c2 = new Column("c2", PrimitiveType.BIGINT, true);
-        columns.add(c2);
-        new Expectations() {
-            {
-                destTable.getBaseSchema();
-                minTimes = 0;
-                result = columns;
-                destTable.getPartitions();
-                minTimes = 0;
-                result = Arrays.asList(partition);
-                scanNode.init((Analyzer) any);
-                minTimes = 0;
-                scanNode.getChildren();
-                minTimes = 0;
-                result = Lists.newArrayList();
-                scanNode.getId();
-                minTimes = 0;
-                result = new PlanNodeId(5);
-                partition.getId();
-                minTimes = 0;
-                result = 0;
-            }
-        };
-        TStreamLoadPutRequest request = new TStreamLoadPutRequest();
-        request.setTxnId(1);
-        request.setLoadId(new TUniqueId(2, 3));
-        request.setFileType(TFileType.FILE_STREAM);
-        request.setFormatType(TFileFormatType.FORMAT_CSV_PLAIN);
-        StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
-        StreamLoadPlanner planner = new StreamLoadPlanner(db, destTable, streamLoadTask);
-        planner.plan(streamLoadTask.getId());
-    }
-
     @Test
     public void testParseStmt() throws Exception {
         String sql = new String("COLUMNS (k1, k2, k3=abc(), k4=default_value())");
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 49b39f2a1f..3da84c62c3 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -95,6 +95,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Comparator;
+import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -495,7 +496,12 @@ public abstract class TestWithFeService {
     }
 
     public void createTable(String sql) throws Exception {
-        createTables(sql);
+        try {
+            createTables(sql);
+        } catch (ConcurrentModificationException e) {
+            e.printStackTrace();
+            throw e;
+        }
     }
 
     public void dropTable(String table, boolean force) throws Exception {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 9cf6ffb6ee..47de9fcdc3 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -544,6 +544,7 @@ struct TStreamLoadPutRequest {
     38: optional string header_type
     39: optional string hidden_columns
     40: optional PlanNodes.TFileCompressType compress_type
+    41: optional i64 file_size // only for stream load with parquet or orc
 }
 
 struct TStreamLoadPutResult {
diff --git a/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy b/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy
index 65996cca81..07b8363b0b 100644
--- a/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy
+++ b/regression-test/suites/correctness_p0/table_valued_function/test_hdfs_tvf.groovy
@@ -26,7 +26,6 @@ suite("test_hdfs_tvf") {
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         try {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "true");"""
 
             // test csv foramt
             uri = "${defaultFS}" + "/user/doris/preinstalled_data/csv_format_test/all_types.csv"
@@ -193,7 +192,6 @@ suite("test_hdfs_tvf") {
             assertTrue(result2[0][0] == 5, "Insert should update 12 rows")
             qt_insert """ select * from test_hdfs_tvf order by id; """
         } finally {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
         }
     }
 }
diff --git a/regression-test/suites/export_p0/test_outfile_parquet.groovy b/regression-test/suites/export_p0/test_outfile_parquet.groovy
index 8b1944d2fb..9be0c8fdc4 100644
--- a/regression-test/suites/export_p0/test_outfile_parquet.groovy
+++ b/regression-test/suites/export_p0/test_outfile_parquet.groovy
@@ -22,7 +22,7 @@ import java.nio.file.Files
 import java.nio.file.Paths
 
 suite("test_outfile_parquet") {
-    def dbName = "test_query_db"
+    def dbName = "test_outfile_parquet"
     sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
     sql "USE $dbName"
     StringBuilder strBuilder = new StringBuilder()
diff --git a/regression-test/suites/external_catalog_p0/hive/test_hive_orc.groovy b/regression-test/suites/external_catalog_p0/hive/test_hive_orc.groovy
index 7e1f8b78fd..2f8d610200 100644
--- a/regression-test/suites/external_catalog_p0/hive/test_hive_orc.groovy
+++ b/regression-test/suites/external_catalog_p0/hive/test_hive_orc.groovy
@@ -66,37 +66,12 @@ suite("test_hive_orc", "all_types") {
         qt_only_partition_col """select count(p1_col), count(p2_col) from orc_all_types;"""
     }
 
-    def set_be_config = { flag ->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            StringBuilder setConfigCommand = new StringBuilder();
-            setConfigCommand.append("curl -X POST http://")
-            setConfigCommand.append(backend[2])
-            setConfigCommand.append(":")
-            setConfigCommand.append(backend[5])
-            setConfigCommand.append("/api/update_config?")
-            String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=$flag"
-            logger.info(command1)
-            String command2 = setConfigCommand.toString() + "enable_new_file_scanner=$flag"
-            logger.info(command2)
-            def process1 = command1.execute()
-            int code = process1.waitFor()
-            assertEquals(code, 0)
-            def process2 = command2.execute()
-            code = process1.waitFor()
-            assertEquals(code, 0)
-        }
-    }
-
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         try {
             String hms_port = context.config.otherConfigs.get("hms_port")
             String catalog_name = "hive_test_orc"
             sql """admin set frontend config ("enable_multi_catalog" = "true")"""
-            sql """admin set frontend config ("enable_new_load_scan_node" = "true");"""
-            set_be_config.call('true')
             sql """drop catalog if exists ${catalog_name}"""
             sql """
             create catalog if not exists ${catalog_name} properties (
@@ -114,8 +89,6 @@ suite("test_hive_orc", "all_types") {
             only_partition_col()
 
         } finally {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
-            set_be_config.call('false')
         }
     }
 }
diff --git a/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy b/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
index f8401d70e9..71c184b872 100644
--- a/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
+++ b/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
@@ -50,36 +50,11 @@ suite("test_hive_other", "p0") {
     }
 
 
-    def set_be_config = { ->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            // No need to set this config anymore, but leave this code sample here
-            // StringBuilder setConfigCommand = new StringBuilder();
-            // setConfigCommand.append("curl -X POST http://")
-            // setConfigCommand.append(backend[2])
-            // setConfigCommand.append(":")
-            // setConfigCommand.append(backend[5])
-            // setConfigCommand.append("/api/update_config?")
-            // String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=true"
-            // logger.info(command1)
-            // String command2 = setConfigCommand.toString() + "enable_new_file_scanner=true"
-            // logger.info(command2)
-            // def process1 = command1.execute()
-            // int code = process1.waitFor()
-            // assertEquals(code, 0)
-            // def process2 = command2.execute()
-            // code = process1.waitFor()
-            // assertEquals(code, 0)
-        }
-    }
-
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String hms_port = context.config.otherConfigs.get("hms_port")
         String hdfs_port = context.config.otherConfigs.get("hdfs_port")
         String catalog_name = "hive_test_other"
-        set_be_config.call()
 
         sql """admin set frontend config ("enable_multi_catalog" = "true")"""
         sql """drop catalog if exists ${catalog_name}"""
diff --git a/regression-test/suites/external_catalog_p0/hive/test_hive_parquet.groovy b/regression-test/suites/external_catalog_p0/hive/test_hive_parquet.groovy
index 8cb89baec5..d644699f00 100644
--- a/regression-test/suites/external_catalog_p0/hive/test_hive_parquet.groovy
+++ b/regression-test/suites/external_catalog_p0/hive/test_hive_parquet.groovy
@@ -139,38 +139,12 @@ suite("test_hive_parquet", "p0") {
     """
     }
 
-
-    def set_be_config = { flag ->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            StringBuilder setConfigCommand = new StringBuilder();
-            setConfigCommand.append("curl -X POST http://")
-            setConfigCommand.append(backend[2])
-            setConfigCommand.append(":")
-            setConfigCommand.append(backend[5])
-            setConfigCommand.append("/api/update_config?")
-            String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=$flag"
-            logger.info(command1)
-            String command2 = setConfigCommand.toString() + "enable_new_file_scanner=$flag"
-            logger.info(command2)
-            def process1 = command1.execute()
-            int code = process1.waitFor()
-            assertEquals(code, 0)
-            def process2 = command2.execute()
-            code = process1.waitFor()
-            assertEquals(code, 0)
-        }
-    }
-
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         try {
             String hms_port = context.config.otherConfigs.get("hms_port")
             String catalog_name = "hive_test_parquet"
             sql """admin set frontend config ("enable_multi_catalog" = "true")"""
-            sql """admin set frontend config ("enable_new_load_scan_node" = "true");"""
-            set_be_config.call('true')
             sql """drop catalog if exists ${catalog_name}"""
             sql """
             create catalog if not exists ${catalog_name} properties (
@@ -201,8 +175,6 @@ suite("test_hive_parquet", "p0") {
             q19()
             q20()
         } finally {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
-            set_be_config.call('false')
         }
     }
 }
diff --git a/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy b/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy
index 0fab95d4ac..e30772ee73 100644
--- a/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy
+++ b/regression-test/suites/jsonb_p0/test_jsonb_load_and_function.groovy
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
 suite("test_jsonb_load_and_function", "p0") {
     // define a sql table
     def testTable = "tbl_test_jsonb"
@@ -35,12 +37,12 @@ suite("test_jsonb_load_and_function", "p0") {
         """
 
     // load the jsonb data from csv file
-    // fail by default for invalid data rows
     streamLoad {
         table testTable
         
         file dataFile // import csv file
         time 10000 // limit inflight 10s
+        set 'strict_mode', 'true'
 
         // if declared a check callback, the default check condition will ignore.
         // So you must check all condition
@@ -50,12 +52,23 @@ suite("test_jsonb_load_and_function", "p0") {
             }
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
+
+            StringBuilder sb = new StringBuilder()
+            sb.append("curl -X GET " + json.ErrorURL)
+            String command = sb.toString()
+            def process = command.execute()
+            def code = process.waitFor()
+            def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())))
+            def out = process.getText()
+            log.info("error result: " + out)
+
             assertEquals("fail", json.Status.toLowerCase())
             assertEquals("[INTERNAL_ERROR]too many filtered rows", json.Message)
             assertEquals(25, json.NumberTotalRows)
             assertEquals(18, json.NumberLoadedRows)
             assertEquals(7, json.NumberFilteredRows)
             assertTrue(json.LoadBytes > 0)
+            log.info("url: " + json.ErrorURL)
         }
     }
 
@@ -68,6 +81,7 @@ suite("test_jsonb_load_and_function", "p0") {
         set 'max_filter_ratio', '0.3'
         file dataFile // import csv file
         time 10000 // limit inflight 10s
+        set 'strict_mode', 'true'
 
         // if declared a check callback, the default check condition will ignore.
         // So you must check all condition
@@ -77,6 +91,16 @@ suite("test_jsonb_load_and_function", "p0") {
             }
             log.info("Stream load result: ${result}".toString())
             def json = parseJson(result)
+
+            StringBuilder sb = new StringBuilder()
+            sb.append("curl -X GET " + json.ErrorURL)
+            String command = sb.toString()
+            def process = command.execute()
+            def code = process.waitFor()
+            def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())))
+            def out = process.getText()
+            log.info("error result: " + out)
+
             assertEquals("success", json.Status.toLowerCase())
             assertEquals(25, json.NumberTotalRows)
             assertEquals(18, json.NumberLoadedRows)
diff --git a/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy b/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy
index 76591c6eb6..b10db1a791 100644
--- a/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy
+++ b/regression-test/suites/jsonb_p0/test_jsonb_load_unique_key_and_function.groovy
@@ -41,6 +41,7 @@ suite("test_jsonb_unique_load_and_function", "p0") {
         
         file dataFile // import csv file
         time 10000 // limit inflight 10s
+        set 'strict_mode', 'true'
 
         // if declared a check callback, the default check condition will ignore.
         // So you must check all condition
@@ -68,6 +69,7 @@ suite("test_jsonb_unique_load_and_function", "p0") {
         set 'max_filter_ratio', '0.3'
         file dataFile // import csv file
         time 10000 // limit inflight 10s
+        set 'strict_mode', 'true'
 
         // if declared a check callback, the default check condition will ignore.
         // So you must check all condition
diff --git a/regression-test/suites/load_p0/broker_load/test_array_load.groovy b/regression-test/suites/load_p0/broker_load/test_array_load.groovy
index 7f5d109952..271050f3ca 100644
--- a/regression-test/suites/load_p0/broker_load/test_array_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_array_load.groovy
@@ -202,13 +202,6 @@ suite("test_array_load", "load_p0") {
 
     try {
         for ( i in 0..1 ) {
-            // should be deleted after new_load_scan is ready
-            if (i == 1) {
-                sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
-            } else {
-                sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "true");"""
-            }
-
             // case1: import array data in json format and enable vectorized engine
             try {
                 sql "DROP TABLE IF EXISTS ${testTable}"
@@ -280,7 +273,6 @@ suite("test_array_load", "load_p0") {
             }
         }
     } finally {
-        try_sql("""ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");""")
     }
 
 
diff --git a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
index e1f16676dd..e67397e821 100644
--- a/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
+++ b/regression-test/suites/load_p0/broker_load/test_broker_load.groovy
@@ -192,28 +192,8 @@ suite("test_broker_load", "p0") {
         logger.info("Submit load with lable: $uuid, table: $table, path: $path")
     }
 
-    def set_be_config = { flag->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            // No need to set this config anymore, but leave this code sample here
-            // StringBuilder setConfigCommand = new StringBuilder();
-            // setConfigCommand.append("curl -X POST http://")
-            // setConfigCommand.append(backend[2])
-            // setConfigCommand.append(":")
-            // setConfigCommand.append(backend[5])
-            // setConfigCommand.append("/api/update_config?")
-            // String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=$flag"
-            // logger.info(command1)
-            // def process1 = command1.execute()
-            // int code = process1.waitFor()
-            // assertEquals(code, 0)
-        }
-    }
-
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         def uuids = []
-        set_be_config.call('true')
         try {
             def i = 0
             for (String table in tables) {
@@ -258,7 +238,6 @@ suite("test_broker_load", "p0") {
             order_qt_parquet_s3_case9 """ select * from parquet_s3_case9"""
 
         } finally {
-            set_be_config.call('false')
             for (String table in tables) {
                 sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text
             }
diff --git a/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy b/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
index 760af3344e..d00316c37a 100644
--- a/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
+++ b/regression-test/suites/load_p0/stream_load/load_json_column_exclude_schema_without_jsonpath.groovy
@@ -51,8 +51,6 @@ suite("test_load_json_column_exclude_schema_without_jsonpath", "p0") {
 
     def load_array_data = {new_json_reader_flag, table_name, strip_flag, read_flag, format_flag, exprs, json_paths, 
                             json_root, where_expr, fuzzy_flag, column_sep, file_name ->
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");"""
 
         // load the json data
         streamLoad {
diff --git a/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy b/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
index f934c038a2..be0baf42b8 100644
--- a/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
+++ b/regression-test/suites/load_p0/stream_load/load_json_null_to_nullable.groovy
@@ -42,9 +42,6 @@ suite("test_load_json_null_to_nullable", "p0") {
 
     def load_array_data = {new_json_reader_flag, table_name, strip_flag, read_flag, format_flag, exprs, json_paths, 
                             json_root, where_expr, fuzzy_flag, column_sep, file_name ->
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");"""
-
         // load the json data
         streamLoad {
             table table_name
@@ -77,9 +74,6 @@ suite("test_load_json_null_to_nullable", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
-
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
     }
 
     def check_data_correct = {table_name ->
diff --git a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
index 02ffd808e2..f48b41be79 100644
--- a/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
+++ b/regression-test/suites/load_p0/stream_load/load_json_with_jsonpath.groovy
@@ -42,10 +42,6 @@ suite("test_load_json_with_jsonpath", "p0") {
 
     def load_array_data = {new_json_reader_flag, table_name, strip_flag, read_flag, format_flag, exprs, json_paths,
                             json_root, where_expr, fuzzy_flag, column_sep, file_name ->
-        
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");"""
-
         // load the json data
         streamLoad {
             table table_name
@@ -78,9 +74,6 @@ suite("test_load_json_with_jsonpath", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
-
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
     }
 
     def check_data_correct = {table_name ->
diff --git a/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy b/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
index 5c79cf1b97..6565357e96 100644
--- a/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_hdfs_json_load.groovy
@@ -44,9 +44,6 @@ suite("test_hdfs_json_load", "p0") {
 
     def load_from_hdfs1 = {new_json_reader_flag, strip_flag, fuzzy_flag, testTablex, label, fileName,
                             fsPath, hdfsUser, exprs, jsonpaths, json_root, columns_parameter, where ->
-        // should be delete after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");"""
-        
         def hdfsFilePath = "${fsPath}/user/doris/preinstalled_data/json_format_test/${fileName}"
         def result1= sql """
                         LOAD LABEL ${label} (
@@ -76,9 +73,6 @@ suite("test_hdfs_json_load", "p0") {
         assertTrue(result1.size() == 1)
         assertTrue(result1[0].size() == 1)
         assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
-
-        // should be delete after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
     }
 
     def check_load_result = {checklabel, testTablex ->
diff --git a/regression-test/suites/load_p0/stream_load/test_json_load.groovy b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
index e066467e3e..513d3e14dd 100644
--- a/regression-test/suites/load_p0/stream_load/test_json_load.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_json_load.groovy
@@ -116,8 +116,6 @@ suite("test_json_load", "p0") {
     
     def load_json_data = {new_json_reader_flag, label, strip_flag, read_flag, format_flag, exprs, json_paths, 
                         json_root, where_expr, fuzzy_flag, file_name, ignore_failure=false ->
-        // should be delete after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "${new_json_reader_flag}");"""
         
         // load the json data
         streamLoad {
@@ -150,9 +148,6 @@ suite("test_json_load", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
-
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
     }
     
     def load_from_hdfs1 = {testTablex, label, hdfsFilePath, format, brokerName, hdfsUser, hdfsPasswd ->
@@ -529,8 +524,6 @@ suite("test_json_load", "p0") {
     try {
         sql "DROP TABLE IF EXISTS ${testTable}"
         create_test_table3.call(testTable)
-        // should be delete after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
         // load the json data
         streamLoad {
             table "${testTable}"
@@ -557,16 +550,12 @@ suite("test_json_load", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
         sql "sync"
         qt_select13 "select * from ${testTable} order by id"
 
 
         sql "DROP TABLE IF EXISTS ${testTable}"
         create_test_table3.call(testTable)
-        // should be delete after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "true");"""
         // load the json data
         streamLoad {
             table "${testTable}"
@@ -593,8 +582,6 @@ suite("test_json_load", "p0") {
                 assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
             }
         }
-        // should be deleted after new_load_scan is ready
-        sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
         sql "sync"
         qt_select13 "select * from ${testTable} order by id"
 
diff --git a/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy b/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy
index fff343078b..0e762f31d3 100644
--- a/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy
@@ -31,13 +31,6 @@ suite("test_txt_special_delimiter", "p0") {
         PROPERTIES ("replication_allocation" = "tag.location.default: 1");
     """
     for ( i in 0..1 ) {
-        // should be deleted after new_load_scan is ready
-        if (i == 1) {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "false");"""
-        } else {
-            sql """ADMIN SET FRONTEND CONFIG ("enable_new_load_scan_node" = "true");"""
-        }
-
         // test special_delimiter success
         streamLoad {
             table "${tableName}"
diff --git a/regression-test/suites/performance_p0/test_streamload_perfomance.groovy b/regression-test/suites/performance_p0/test_streamload_perfomance.groovy
index ef7e689e1d..adb0cafb87 100644
--- a/regression-test/suites/performance_p0/test_streamload_perfomance.groovy
+++ b/regression-test/suites/performance_p0/test_streamload_perfomance.groovy
@@ -37,7 +37,7 @@ suite("test_streamload_perfomance") {
 
         streamLoad {
             table tableName
-            time 5000
+            time 10000
             inputIterator rowIt
         }
     } finally {
diff --git a/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_orc.groovy b/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_orc.groovy
index 13b12e0027..e2c2cf8b1e 100644
--- a/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_orc.groovy
+++ b/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_orc.groovy
@@ -797,35 +797,10 @@ order by
         """
     }
 
-    def set_be_config = { ->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            // No need to set this config anymore, but leave this code sample here
-            // StringBuilder setConfigCommand = new StringBuilder();
-            // setConfigCommand.append("curl -X POST http://")
-            // setConfigCommand.append(backend[2])
-            // setConfigCommand.append(":")
-            // setConfigCommand.append(backend[5])
-            // setConfigCommand.append("/api/update_config?")
-            // String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=true"
-            // logger.info(command1)
-            // String command2 = setConfigCommand.toString() + "enable_new_file_scanner=true"
-            // logger.info(command2)
-            // def process1 = command1.execute()
-            // int code = process1.waitFor()
-            // assertEquals(code, 0)
-            // def process2 = command2.execute()
-            // code = process1.waitFor()
-            // assertEquals(code, 0)
-        }
-    }
-
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String hms_port = context.config.otherConfigs.get("hms_port")
         String catalog_name = "test_catalog_hive_orc"
-        set_be_config.call()
 
         sql """admin set frontend config ("enable_multi_catalog" = "true")"""
         sql """drop catalog if exists ${catalog_name}"""
diff --git a/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_parquet.groovy b/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_parquet.groovy
index ce36a181a4..01530b738a 100644
--- a/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_parquet.groovy
+++ b/regression-test/suites/tpch_sf1_p0/multi_catalog_query/hive_catalog_parquet.groovy
@@ -797,35 +797,10 @@ order by
         """
     }
 
-    def set_be_config = { ->
-        String[][] backends = sql """ show backends; """
-        assertTrue(backends.size() > 0)
-        for (String[] backend in backends) {
-            // No need to set this config anymore, but leave this code sample here
-            // StringBuilder setConfigCommand = new StringBuilder();
-            // setConfigCommand.append("curl -X POST http://")
-            // setConfigCommand.append(backend[2])
-            // setConfigCommand.append(":")
-            // setConfigCommand.append(backend[5])
-            // setConfigCommand.append("/api/update_config?")
-            // String command1 = setConfigCommand.toString() + "enable_new_load_scan_node=true"
-            // logger.info(command1)
-            // String command2 = setConfigCommand.toString() + "enable_new_file_scanner=true"
-            // logger.info(command2)
-            // def process1 = command1.execute()
-            // int code = process1.waitFor()
-            // assertEquals(code, 0)
-            // def process2 = command2.execute()
-            // code = process1.waitFor()
-            // assertEquals(code, 0)
-        }
-    }
-
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String hms_port = context.config.otherConfigs.get("hms_port")
         String catalog_name = "test_catalog_hive_parquet"
-        set_be_config.call()
 
         sql """admin set frontend config ("enable_multi_catalog" = "true")"""
         sql """drop catalog if exists ${catalog_name}"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org