You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/01/24 02:10:12 UTC

[incubator-doris] branch master updated: [Load Parallel][1/3] Broker Load supports setting the load parallelism (#5277)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e61400  [Load Parallel][1/3] Broker Load supports setting the load parallelism (#5277)
7e61400 is described below

commit 7e61400e3c82ed0e4aad6824873d0fb113ec1a9e
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Sun Jan 24 10:09:53 2021 +0800

    [Load Parallel][1/3] Broker Load supports setting the load parallelism (#5277)
    
    * [Load] Broker Load supports setting the load parallelism
    
    Similar to the parallel_fragment_exec_instance_num parameter,
    it allows the user to set the parallelism of the load execution plan
    on a single node when the broker load is submitted.
    
    eg:
    ```
    ...
    properties (
    "load_parallelism" = "4";
    ...
    )
    ```
    
    This parameter is currently only used to support the load parallelism setting,
    but it cannot significantly improve the load speed for the time being.
    The speed increase will be completed in subsequent code submissions.
    Documents will also be added in subsequent submissions.
    
    This PR also update the FE meta version.
---
 be/src/olap/version_graph.cpp                      |   2 +-
 .../java/org/apache/doris/analysis/LoadStmt.java   |  83 ++++----
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../src/main/java/org/apache/doris/load/Load.java  |   4 -
 .../main/java/org/apache/doris/load/LoadJob.java   |   2 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  24 +--
 .../org/apache/doris/load/loadv2/BulkLoadJob.java  |  12 +-
 .../apache/doris/load/loadv2/InsertLoadJob.java    |   8 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java | 235 ++++++++++++++-------
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |   6 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      |   7 +-
 .../org/apache/doris/load/loadv2/MiniLoadJob.java  |  20 +-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |  13 +-
 .../doris/load/loadv2/SparkLoadPendingTask.java    |   4 +-
 .../org/apache/doris/planner/BrokerScanNode.java   |  17 +-
 .../org/apache/doris/planner/LoadScanNode.java     |   1 -
 .../org/apache/doris/planner/PlanFragment.java     |   6 +
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   3 +-
 .../doris/load/loadv2/BrokerLoadJobTest.java       |  20 +-
 .../org/apache/doris/load/loadv2/LoadJobTest.java  |  16 +-
 .../apache/doris/load/loadv2/LoadManagerTest.java  |   8 +-
 .../apache/doris/load/loadv2/SparkLoadJobTest.java |   4 +-
 .../apache/doris/persist/LoadJobV2PersistTest.java | 105 +++++++++
 23 files changed, 396 insertions(+), 208 deletions(-)

diff --git a/be/src/olap/version_graph.cpp b/be/src/olap/version_graph.cpp
index fb50b86..4873c98 100644
--- a/be/src/olap/version_graph.cpp
+++ b/be/src/olap/version_graph.cpp
@@ -354,7 +354,7 @@ PathVersionListSharedPtr TimestampedVersionTracker::fetch_and_delete_path_by_id(
         return nullptr;
     }
 
-    LOG(INFO) << _get_current_path_map_str();
+    VLOG_NOTICE << _get_current_path_map_str();
     PathVersionListSharedPtr ptr = fetch_path_version_by_id(path_id);
 
     _stale_version_path_map.erase(path_id);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index cfca472..8ebee50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -29,16 +29,17 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.load.EtlJobType;
-import org.apache.doris.load.Load;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
+import org.checkerframework.checker.nullness.qual.Nullable;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -75,12 +76,11 @@ import java.util.Map.Entry;
 public class LoadStmt extends DdlStmt {
     public static final String TIMEOUT_PROPERTY = "timeout";
     public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
-    public static final String LOAD_DELETE_FLAG_PROPERTY = "load_delete_flag";
     public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
     public static final String CLUSTER_PROPERTY = "cluster";
-    private static final String VERSION = "version";
     public static final String STRICT_MODE = "strict_mode";
     public static final String TIMEZONE = "timezone";
+    public static final String LOAD_PARALLELISM = "load_parallelism";
 
     // for load data from Baidu Object Store(BOS)
     public static final String BOS_ENDPOINT = "bos_endpoint";
@@ -124,18 +124,43 @@ public class LoadStmt extends DdlStmt {
 
     private EtlJobType etlJobType = EtlJobType.UNKNOWN;
 
-    private String version = "v2";
-
-    // properties set
-    private final static ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
-            .add(TIMEOUT_PROPERTY)
-            .add(MAX_FILTER_RATIO_PROPERTY)
-            .add(LOAD_DELETE_FLAG_PROPERTY)
-            .add(EXEC_MEM_LIMIT)
-            .add(CLUSTER_PROPERTY)
-            .add(STRICT_MODE)
-            .add(VERSION)
-            .add(TIMEZONE)
+    public final static ImmutableMap<String, Function> PROPERTIES_MAP = new ImmutableMap.Builder<String, Function>()
+            .put(TIMEOUT_PROPERTY, new Function<String, Long>() {
+                @Override
+                public @Nullable Long apply(@Nullable String s) {
+                    return Long.valueOf(s);
+                }
+            })
+            .put(MAX_FILTER_RATIO_PROPERTY, new Function<String, Double>() {
+                @Override
+                public @Nullable Double apply(@Nullable String s) {
+                    return Double.valueOf(s);
+                }
+            })
+            .put(EXEC_MEM_LIMIT, new Function<String, Long>() {
+                @Override
+                public @Nullable Long apply(@Nullable String s) {
+                    return Long.valueOf(s);
+                }
+            })
+            .put(STRICT_MODE, new Function<String, Boolean>() {
+                @Override
+                public @Nullable Boolean apply(@Nullable String s) {
+                    return Boolean.valueOf(s);
+                }
+            })
+            .put(TIMEZONE, new Function<String, String>() {
+                @Override
+                public @Nullable String apply(@Nullable String s) {
+                    return s;
+                }
+            })
+            .put(LOAD_PARALLELISM, new Function<String, Integer>() {
+                @Override
+                public @Nullable Integer apply(@Nullable String s) {
+                    return Integer.valueOf(s);
+                }
+            })
             .build();
 
     public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
@@ -198,7 +223,7 @@ public class LoadStmt extends DdlStmt {
         }
 
         for (Entry<String, String> entry : properties.entrySet()) {
-            if (!PROPERTIES_SET.contains(entry.getKey())) {
+            if (!PROPERTIES_MAP.containsKey(entry.getKey())) {
                 throw new DdlException(entry.getKey() + " is invalid property");
             }
         }
@@ -242,14 +267,6 @@ public class LoadStmt extends DdlStmt {
             }
         }
 
-        // version
-        final String versionProperty = properties.get(VERSION);
-        if (versionProperty != null) {
-            if (!versionProperty.equalsIgnoreCase(Load.VERSION)) {
-                throw new DdlException(VERSION + " must be " + Load.VERSION);
-            }
-        }
-
         // strict mode
         final String strictModeProperty = properties.get(STRICT_MODE);
         if (strictModeProperty != null) {
@@ -267,16 +284,6 @@ public class LoadStmt extends DdlStmt {
         }
     }
 
-    private void analyzeVersion() throws AnalysisException {
-        if (properties == null) {
-            return;
-        }
-        final String versionProperty = properties.get(VERSION);
-        if (versionProperty != null) {
-            throw new AnalysisException("Do not support VERSION property");
-        }
-    }
-
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
         super.analyze(analyzer);
@@ -349,7 +356,6 @@ public class LoadStmt extends DdlStmt {
             throw new AnalysisException(e.getMessage());
         }
 
-        analyzeVersion();
         user = ConnectContext.get().getQualifiedUser();
     }
 
@@ -361,15 +367,10 @@ public class LoadStmt extends DdlStmt {
         return false;
     }
 
-    public String getVersion() {
-        return version;
-    }
-
     public void setEtlJobType(EtlJobType etlJobType) {
         this.etlJobType = etlJobType;
     }
 
-
     @Override
     public String toSql() {
         StringBuilder sb = new StringBuilder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index e3ea090..ab17d33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -198,6 +198,8 @@ public final class FeMetaVersion {
     public static final int VERSION_92 = 92;
     //jira: 4863 for load job support udf
     public static final int VERSION_93 = 93;
+    // refactor load job property persist method
+    public static final int VERSION_94 = 94;
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_93;
+    public static final int VERSION_CURRENT = VERSION_94;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 9f74128..cdeef41 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -474,10 +474,6 @@ public class Load {
                 }
             }
 
-            if (properties.containsKey(LoadStmt.LOAD_DELETE_FLAG_PROPERTY)) {
-                throw new DdlException("Do not support load_delete_flag");
-            }
-
             if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
                 try {
                     job.setExecMemLimit(Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT)));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
index b2939c9..5d0f26b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
@@ -1010,8 +1010,8 @@ public class LoadJob implements Writable {
         if (this.id == job.id) {
             return true;
         }
-
         return false;
     }
 
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 55d15c5..5027ecc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -24,7 +24,6 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DataQualityException;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
@@ -33,10 +32,10 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
+import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
 import org.apache.doris.load.EtlJobType;
@@ -51,12 +50,12 @@ import org.apache.doris.transaction.TransactionState;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
 import org.apache.doris.transaction.TransactionState.TxnSourceType;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -79,17 +78,14 @@ public class BrokerLoadJob extends BulkLoadJob {
 
     // for log replay and unit test
     public BrokerLoadJob() {
-        super();
-        this.jobType = EtlJobType.BROKER;
+        super(EtlJobType.BROKER);
     }
 
     public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc,
                          OriginStatement originStmt, UserIdentity userInfo)
             throws MetaNotFoundException {
-        super(dbId, label, originStmt, userInfo);
-        this.timeoutSecond = Config.broker_load_default_timeout_second;
+        super(EtlJobType.BROKER, dbId, label, originStmt, userInfo);
         this.brokerDesc = brokerDesc;
-        this.jobType = EtlJobType.BROKER;
         if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isReportSucc()) {
             isReportSuccess = true;
         }
@@ -103,7 +99,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                 .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
                         new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
                         TransactionState.LoadJobSourceType.BATCH_LOAD_JOB, id,
-                        timeoutSecond);
+                        getTimeout());
     }
 
     @Override
@@ -199,9 +195,9 @@ public class BrokerLoadJob extends BulkLoadJob {
                 OlapTable table = (OlapTable) db.getTable(tableId);
                 // Generate loading task and init the plan of task
                 LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc,
-                        brokerFileGroups, getDeadlineMs(), execMemLimit,
-                        strictMode, transactionId, this, timezone, timeoutSecond,
-                        isReportSuccess ? jobProfile : null);
+                        brokerFileGroups, getDeadlineMs(), getExecMemLimit(),
+                        isStrictMode(), transactionId, this, getTimeZone(), getTimeout(),
+                        getLoadParallelism(), isReportSuccess ? jobProfile : null);
                 UUID uuid = UUID.randomUUID();
                 TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
                 task.init(loadId, attachment.getFileStatusByTable(aggKey),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
index b6732b3..35334ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.util.LogKey;
 import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.BrokerFileGroupAggInfo;
+import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
@@ -85,13 +86,12 @@ public abstract class BulkLoadJob extends LoadJob {
     // we persist these sessionVariables due to the session is not available when replaying the job.
     private Map<String, String> sessionVariables = Maps.newHashMap();
 
-    // only for log replay
-    public BulkLoadJob() {
-        super();
+    public BulkLoadJob(EtlJobType jobType) {
+        super(jobType);
     }
 
-    public BulkLoadJob(long dbId, String label, OriginStatement originStmt, UserIdentity userInfo) throws MetaNotFoundException {
-        super(dbId, label);
+    public BulkLoadJob(EtlJobType jobType, long dbId, String label, OriginStatement originStmt, UserIdentity userInfo) throws MetaNotFoundException {
+        super(jobType, dbId, label);
         this.originStmt = originStmt;
         this.authorizationInfo = gatherAuthInfo();
         this.userInfo = userInfo;
@@ -164,7 +164,7 @@ public abstract class BulkLoadJob extends LoadJob {
     private AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {
         Database database = Catalog.getCurrentCatalog().getDb(dbId);
         if (database == null) {
-            throw new MetaNotFoundException("Database " + dbId + "has been deleted");
+            throw new MetaNotFoundException("Database " + dbId + " has been deleted");
         }
         return new AuthorizationInfo(database.getFullName(), getTableNames());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
index 7a72e21..bc0db60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java
@@ -21,7 +21,6 @@ import org.apache.doris.catalog.AuthorizationInfo;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
@@ -46,13 +45,12 @@ public class InsertLoadJob extends LoadJob {
 
     // only for log replay
     public InsertLoadJob() {
-        super();
-        this.jobType = EtlJobType.INSERT;
+        super(EtlJobType.INSERT);
     }
 
     public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg,
             String trackingUrl) throws MetaNotFoundException {
-        super(dbId, label);
+        super(EtlJobType.INSERT, dbId, label);
         this.tableId = tableId;
         this.createTimestamp = createTimestamp;
         this.loadStartTimestamp = createTimestamp;
@@ -65,8 +63,6 @@ public class InsertLoadJob extends LoadJob {
             this.failMsg = new FailMsg(CancelType.LOAD_RUN_FAIL, failMsg);
             this.progress = 0;
         }
-        this.jobType = EtlJobType.INSERT;
-        this.timeoutSecond = Config.insert_load_default_timeout_second;
         this.authorizationInfo = gatherAuthInfo();
         this.loadingStatus.setTrackingUrl(trackingUrl);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 8b8ff0d..db79fa2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.load.loadv2;
 
-import com.google.gson.annotations.SerializedName;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.catalog.AuthorizationInfo;
 import org.apache.doris.catalog.Catalog;
@@ -28,8 +27,8 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
-import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -66,6 +65,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Table;
 import com.google.gson.Gson;
+import com.google.gson.annotations.SerializedName;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -95,16 +95,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
     // the auth info could be null when load job is created before commit named 'Persist auth info in load job'
     protected AuthorizationInfo authorizationInfo;
 
-    // optional properties
-    // timeout second need to be reset in constructor of subclass
-    protected long timeoutSecond = Config.broker_load_default_timeout_second;
-    protected long execMemLimit = 2147483648L; // 2GB;
-    protected double maxFilterRatio = Config.default_max_filter_ratio;
-    protected boolean strictMode = false; // default is false
-    protected String timezone = TimeUtils.DEFAULT_TIME_ZONE;
-    @Deprecated
-    protected boolean deleteFlag = false;
-
     protected long createTimestamp = System.currentTimeMillis();
     protected long loadStartTimestamp = -1;
     protected long finishTimestamp = -1;
@@ -132,6 +122,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
 
     protected LoadStatistic loadStatistic = new LoadStatistic();
 
+    // This map is used to save job property.
+    private Map<String, Object> jobProperties = Maps.newHashMap();
+
     // only for persistence param. see readFields() for usage
     private boolean isJobTypeRead = false;
 
@@ -206,11 +199,13 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         }
     }
 
-    // only for log replay
-    public LoadJob() {
+    public LoadJob(EtlJobType jobType) {
+        this.jobType = jobType;
+        initDefaultJobProperties();
     }
 
-    public LoadJob(long dbId, String label) {
+    public LoadJob(EtlJobType jobType, long dbId, String label) {
+        this(jobType);
         this.id = Catalog.getCurrentCatalog().getNextId();
         this.dbId = dbId;
         this.label = label;
@@ -265,7 +260,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
     }
 
     protected long getDeadlineMs() {
-        return createTimestamp + timeoutSecond * 1000;
+        return createTimestamp + getTimeout() * 1000;
     }
 
     private boolean isTimeout() {
@@ -324,58 +319,59 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         return state == JobState.FINISHED || state == JobState.CANCELLED || state == JobState.UNKNOWN;
     }
 
-    protected void setJobProperties(Map<String, String> properties) throws DdlException {
-        // resource info
+    public void setJobProperties(Map<String, String> properties) throws DdlException {
+        initDefaultJobProperties();
+
+        // set property from session variables
         if (ConnectContext.get() != null) {
-            execMemLimit = ConnectContext.get().getSessionVariable().getMaxExecMemByte();
+            jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, ConnectContext.get().getSessionVariable().getMaxExecMemByte());
+            jobProperties.put(LoadStmt.TIMEZONE, ConnectContext.get().getSessionVariable().getTimeZone());
         }
 
-        // job properties
-        if (properties != null) {
-            if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
-                try {
-                    timeoutSecond = Integer.parseInt(properties.get(LoadStmt.TIMEOUT_PROPERTY));
-                } catch (NumberFormatException e) {
-                    throw new DdlException("Timeout is not INT", e);
-                }
-            }
-
-            if (properties.containsKey(LoadStmt.MAX_FILTER_RATIO_PROPERTY)) {
-                try {
-                    maxFilterRatio = Double.parseDouble(properties.get(LoadStmt.MAX_FILTER_RATIO_PROPERTY));
-                } catch (NumberFormatException e) {
-                    throw new DdlException("Max filter ratio is not DOUBLE", e);
-                }
-            }
-
-            if (properties.containsKey(LoadStmt.LOAD_DELETE_FLAG_PROPERTY)) {
-                String flag = properties.get(LoadStmt.LOAD_DELETE_FLAG_PROPERTY);
-                if (flag.equalsIgnoreCase("true") || flag.equalsIgnoreCase("false")) {
-                    deleteFlag = Boolean.parseBoolean(flag);
-                } else {
-                    throw new DdlException("Value of delete flag is invalid");
-                }
-            }
+        if (properties == null || properties.isEmpty()) {
+            return;
+        }
 
-            if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
-                try {
-                    execMemLimit = Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT));
-                } catch (NumberFormatException e) {
-                    throw new DdlException("Execute memory limit is not Long", e);
-                }
+        // set property from specified job properties
+        for (String key : LoadStmt.PROPERTIES_MAP.keySet()) {
+            if (!properties.containsKey(key)) {
+                continue;
             }
-
-            if (properties.containsKey(LoadStmt.STRICT_MODE)) {
-                strictMode = Boolean.valueOf(properties.get(LoadStmt.STRICT_MODE));
+            try {
+                jobProperties.put(key, LoadStmt.PROPERTIES_MAP.get(key).apply(properties.get(key)));
+            } catch (Exception e) {
+                throw new DdlException("Failed to set property " + key + ". Error: " + e.getMessage());
             }
+        }
+    }
 
-            if (properties.containsKey(LoadStmt.TIMEZONE)) {
-                timezone = properties.get(LoadStmt.TIMEZONE);
-            } else if (ConnectContext.get() != null) {
-                // get timezone for session variable
-                timezone = ConnectContext.get().getSessionVariable().getTimeZone();
-            }
+    private void initDefaultJobProperties() {
+        long timeout = Config.broker_load_default_timeout_second;
+        switch (jobType) {
+            case SPARK:
+                timeout = Config.spark_load_default_timeout_second;
+                break;
+            case HADOOP:
+                timeout = Config.hadoop_load_default_timeout_second;
+                break;
+            case BROKER:
+                timeout = Config.broker_load_default_timeout_second;
+                break;
+            case INSERT:
+                timeout = Config.insert_load_default_timeout_second;
+                break;
+            case MINI:
+                timeout = Config.stream_load_default_timeout_second;
+                break;
+            default:
+                break;
         }
+        jobProperties.put(LoadStmt.TIMEOUT_PROPERTY, timeout);
+        jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, 2 * 1024 * 1024 * 1024L);
+        jobProperties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, 0.0);
+        jobProperties.put(LoadStmt.STRICT_MODE, false);
+        jobProperties.put(LoadStmt.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE);
+        jobProperties.put(LoadStmt.LOAD_PARALLELISM, 1);
     }
 
     public void isJobTypeRead(boolean jobTypeRead) {
@@ -671,7 +667,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
 
         long normalNum = Long.parseLong(counters.get(DPP_NORMAL_ALL));
         long abnormalNum = Long.parseLong(counters.get(DPP_ABNORMAL_ALL));
-        if (abnormalNum > (abnormalNum + normalNum) * maxFilterRatio) {
+        if (abnormalNum > (abnormalNum + normalNum) * getMaxFilterRatio()) {
             return false;
         }
 
@@ -733,8 +729,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             }
 
             // task info
-            jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + timeoutSecond
-                                + "; max_filter_ratio:" + maxFilterRatio);
+            jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + getTimeout()
+                                + "; max_filter_ratio:" + getMaxFilterRatio());
 
             // error msg
             if (failMsg == null) {
@@ -970,10 +966,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         out.writeLong(dbId);
         Text.writeString(out, label);
         Text.writeString(out, state.name());
-        out.writeLong(timeoutSecond);
-        out.writeLong(execMemLimit);
-        out.writeDouble(maxFilterRatio);
-        out.writeBoolean(deleteFlag);
         out.writeLong(createTimestamp);
         out.writeLong(loadStartTimestamp);
         out.writeLong(finishTimestamp);
@@ -985,7 +977,6 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         }
         out.writeInt(progress);
         loadingStatus.write(out);
-        out.writeBoolean(strictMode);
         out.writeLong(transactionId);
         if (authorizationInfo == null) {
             out.writeBoolean(false);
@@ -993,10 +984,20 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             out.writeBoolean(true);
             authorizationInfo.write(out);
         }
-        Text.writeString(out, timezone);
+
+        out.writeInt(this.jobProperties.size());
+        for (Map.Entry<String, Object> entry : jobProperties.entrySet()) {
+            Text.writeString(out, entry.getKey());
+            Text.writeString(out, String.valueOf(entry.getValue()));
+        }
     }
 
     public void readFields(DataInput in) throws IOException {
+        if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_94) {
+            readFieldOld(in);
+            return;
+        }
+
         if (!isJobTypeRead) {
             jobType = EtlJobType.valueOf(Text.readString(in));
             isJobTypeRead = true;
@@ -1006,14 +1007,65 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         dbId = in.readLong();
         label = Text.readString(in);
         state = JobState.valueOf(Text.readString(in));
+
+        createTimestamp = in.readLong();
+        loadStartTimestamp = in.readLong();
+        finishTimestamp = in.readLong();
+        if (in.readBoolean()) {
+            failMsg = new FailMsg();
+            failMsg.readFields(in);
+        }
+        progress = in.readInt();
+        loadingStatus.readFields(in);
+        transactionId = in.readLong();
+        if (in.readBoolean()) {
+            authorizationInfo = new AuthorizationInfo();
+            authorizationInfo.readFields(in);
+        }
+
+        int size = in.readInt();
+        Map<String, String> tmpProperties = Maps.newHashMap();
+        for (int i = 0; i < size; i++) {
+            String key = Text.readString(in);
+            String val = Text.readString(in);
+            tmpProperties.put(key, val);
+        }
+        // init jobProperties
+        try {
+            setJobProperties(tmpProperties);
+        } catch (Exception e) {
+            // should not happen
+            throw new IOException("failed to replay job property", e);
+        }
+    }
+
+    // This method is to read the old meta, which the job properties are persist one by one.
+    // The new meta will save the job properties into jobProperties map
+    @Deprecated
+    private void readFieldOld(DataInput in) throws IOException {
+        if (!isJobTypeRead) {
+            jobType = EtlJobType.valueOf(Text.readString(in));
+            isJobTypeRead = true;
+        }
+
+        id = in.readLong();
+        dbId = in.readLong();
+        label = Text.readString(in);
+        state = JobState.valueOf(Text.readString(in));
+        long timeoutSecond;
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_54) {
             timeoutSecond = in.readLong();
         } else {
             timeoutSecond = in.readInt();
         }
-        execMemLimit = in.readLong();
-        maxFilterRatio = in.readDouble();
-        deleteFlag = in.readBoolean();
+        long execMemLimit = in.readLong();
+        double maxFilterRatio = in.readDouble();
+        // delete flag is never used
+        boolean deleteFlag = in.readBoolean();
+        jobProperties.put(LoadStmt.TIMEOUT_PROPERTY, timeoutSecond);
+        jobProperties.put(LoadStmt.EXEC_MEM_LIMIT, execMemLimit);
+        jobProperties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, maxFilterRatio);
+
         createTimestamp = in.readLong();
         loadStartTimestamp = in.readLong();
         finishTimestamp = in.readLong();
@@ -1024,7 +1076,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         progress = in.readInt();
         loadingStatus.readFields(in);
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_54) {
-            strictMode = in.readBoolean();
+            boolean strictMode = in.readBoolean();
+            jobProperties.put(LoadStmt.STRICT_MODE, strictMode);
             transactionId = in.readLong();
         }
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_56) {
@@ -1034,7 +1087,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             }
         }
         if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_61) {
-            timezone = Text.readString(in);
+            String timezone = Text.readString(in);
+            jobProperties.put(LoadStmt.TIMEZONE, timezone);
         }
     }
 
@@ -1093,4 +1147,37 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             return GsonUtils.GSON.fromJson(json, LoadJobStateUpdateInfo.class);
         }
     }
+
+    // unit: second
+    protected long getTimeout() {
+        return (long) jobProperties.get(LoadStmt.TIMEOUT_PROPERTY);
+    }
+
+    protected void setTimeout(long timeout) {
+        jobProperties.put(LoadStmt.TIMEOUT_PROPERTY, timeout);
+    }
+
+    protected long getExecMemLimit() {
+        return (long) jobProperties.get(LoadStmt.EXEC_MEM_LIMIT);
+    }
+
+    protected double getMaxFilterRatio() {
+        return (double) jobProperties.get(LoadStmt.MAX_FILTER_RATIO_PROPERTY);
+    }
+
+    protected void setMaxFilterRatio(double maxFilterRatio) {
+        jobProperties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, maxFilterRatio);
+    }
+
+    protected boolean isStrictMode() {
+        return (boolean) jobProperties.get(LoadStmt.STRICT_MODE);
+    }
+
+    protected String getTimeZone() {
+        return (String) jobProperties.get(LoadStmt.TIMEZONE);
+    }
+
+    public int getLoadParallelism() {
+        return (int) jobProperties.get(LoadStmt.LOAD_PARALLELISM);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 0e288ad..c8bebbe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -63,6 +63,7 @@ public class LoadLoadingTask extends LoadTask {
     private final String timezone;
     // timeout of load job, in seconds
     private final long timeoutS;
+    private final int loadParallelism;
 
     private LoadingTaskPlanner planner;
 
@@ -74,7 +75,7 @@ public class LoadLoadingTask extends LoadTask {
                            BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
                            long jobDeadlineMs, long execMemLimit, boolean strictMode,
                            long txnId, LoadTaskCallback callback, String timezone,
-                           long timeoutS, RuntimeProfile profile) {
+                           long timeoutS, int loadParallelism, RuntimeProfile profile) {
         super(callback, TaskType.LOADING);
         this.db = db;
         this.table = table;
@@ -88,13 +89,14 @@ public class LoadLoadingTask extends LoadTask {
         this.retryTime = 2; // 2 times is enough
         this.timezone = timezone;
         this.timeoutS = timeoutS;
+        this.loadParallelism = loadParallelism;
         this.jobProfile = profile;
     }
 
     public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException {
         this.loadId = loadId;
         planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table,
-                brokerDesc, fileGroups, strictMode, timezone, this.timeoutS, userInfo);
+                brokerDesc, fileGroups, strictMode, timezone, this.timeoutS, this.loadParallelism, userInfo);
         planner.plan(loadId, fileStatusList, fileNum);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index c728701..9d4a497 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -67,6 +67,7 @@ public class LoadingTaskPlanner {
     private final List<BrokerFileGroup> fileGroups;
     private final boolean strictMode;
     private final long timeoutS;    // timeout of load job, in second
+    private final int loadParallelism;
     private UserIdentity userInfo;
     // Something useful
     // ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase()
@@ -81,7 +82,7 @@ public class LoadingTaskPlanner {
 
     public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table,
                               BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
-                              boolean strictMode, String timezone, long timeoutS,
+                              boolean strictMode, String timezone, long timeoutS, int loadParallelism,
                               UserIdentity userInfo) {
         this.loadJobId = loadJobId;
         this.txnId = txnId;
@@ -92,6 +93,7 @@ public class LoadingTaskPlanner {
         this.strictMode = strictMode;
         this.analyzer.setTimezone(timezone);
         this.timeoutS = timeoutS;
+        this.loadParallelism = loadParallelism;
         this.userInfo = userInfo;
         if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(userInfo,
                 Catalog.getCurrentCatalog().getDb(dbId).getFullName(), PrivPredicate.SELECT)) {
@@ -121,7 +123,7 @@ public class LoadingTaskPlanner {
         // 1. Broker scan node
         BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode",
                                                      fileStatusesList, filesAdded);
-        scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode);
+        scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode, loadParallelism);
         scanNode.init(analyzer);
         scanNode.finalize(analyzer);
         scanNodes.add(scanNode);
@@ -135,6 +137,7 @@ public class LoadingTaskPlanner {
 
         // 3. Plan fragment
         PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.RANDOM);
+        sinkFragment.setParallelExecNum(loadParallelism);
         sinkFragment.setSink(olapTableSink);
 
         fragments.add(sinkFragment);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
index a77da09..382867a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java
@@ -17,12 +17,10 @@
 
 package org.apache.doris.load.loadv2;
 
-import com.google.common.collect.Lists;
 import org.apache.doris.catalog.AuthorizationInfo;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -32,9 +30,10 @@ import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.thrift.TMiniLoadBeginRequest;
 import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.TransactionState;
-import org.apache.doris.transaction.TransactionState.TxnSourceType;
 import org.apache.doris.transaction.TransactionState.TxnCoordinator;
+import org.apache.doris.transaction.TransactionState.TxnSourceType;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import org.apache.logging.log4j.LogManager;
@@ -52,24 +51,19 @@ public class MiniLoadJob extends LoadJob {
 
     private long tableId;
 
-    // only for log replay
     public MiniLoadJob() {
-        super();
-        this.jobType = EtlJobType.MINI;
+        super(EtlJobType.MINI);
     }
 
     public MiniLoadJob(long dbId, long tableId, TMiniLoadBeginRequest request) throws MetaNotFoundException {
-        super(dbId, request.getLabel());
+        super(EtlJobType.MINI, dbId, request.getLabel());
         this.tableId = tableId;
-        this.jobType = EtlJobType.MINI;
         this.tableName = request.getTbl();
         if (request.isSetTimeoutSecond()) {
-            this.timeoutSecond = request.getTimeoutSecond();
-        } else {
-            this.timeoutSecond = Config.stream_load_default_timeout_second;
+            setTimeout(request.getTimeoutSecond());
         }
         if (request.isSetMaxFilterRatio()) {
-            this.maxFilterRatio = request.getMaxFilterRatio();
+            setMaxFilterRatio(request.getMaxFilterRatio());
         }
         this.createTimestamp = request.getCreateTimestamp();
         this.loadStartTimestamp = createTimestamp;
@@ -102,7 +96,7 @@ public class MiniLoadJob extends LoadJob {
                 .beginTransaction(dbId, Lists.newArrayList(tableId), label, requestId,
                                   new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
                                   TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
-                                  timeoutSecond);
+                                  getTimeout());
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index d8907f5..5d40b95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -145,21 +145,18 @@ public class SparkLoadJob extends BulkLoadJob {
 
     // only for log replay
     public SparkLoadJob() {
-        super();
-        jobType = EtlJobType.SPARK;
+        super(EtlJobType.SPARK);
     }
 
     public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc,
                         OriginStatement originStmt, UserIdentity userInfo)
             throws MetaNotFoundException {
-        super(dbId, label, originStmt, userInfo);
+        super(EtlJobType.SPARK, dbId, label, originStmt, userInfo);
         this.resourceDesc = resourceDesc;
-        timeoutSecond = Config.spark_load_default_timeout_second;
-        jobType = EtlJobType.SPARK;
     }
 
     @Override
-    protected void setJobProperties(Map<String, String> properties) throws DdlException {
+    public void setJobProperties(Map<String, String> properties) throws DdlException {
         super.setJobProperties(properties);
 
         // set spark resource and broker desc
@@ -191,7 +188,7 @@ public class SparkLoadJob extends BulkLoadJob {
        transactionId = Catalog.getCurrentGlobalTransactionMgr()
                 .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
                                   new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
-                                  LoadJobSourceType.FRONTEND, id, timeoutSecond);
+                                  LoadJobSourceType.FRONTEND, id, getTimeout());
     }
 
     @Override
@@ -948,4 +945,4 @@ public class SparkLoadJob extends BulkLoadJob {
             tDescriptorTable = descTable.toThrift();
         }
     }
-}
\ No newline at end of file
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
index f129f46..deb612d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -190,8 +190,8 @@ public class SparkLoadPendingTask extends LoadTask {
         String outputFilePattern = EtlJobConfig.getOutputFilePattern(loadLabel, FilePatternVersion.V1);
         // strictMode timezone properties
         EtlJobProperty properties = new EtlJobProperty();
-        properties.strictMode = ((LoadJob) callback).strictMode;
-        properties.timezone = ((LoadJob) callback).timezone;
+        properties.strictMode = ((LoadJob) callback).isStrictMode();
+        properties.timezone = ((LoadJob) callback).getTimeZone();
         etlJobConfig = new EtlJobConfig(tables, outputFilePattern, loadLabel, properties);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 5923d9c..bb52fbb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -50,14 +50,14 @@ import org.apache.doris.thrift.TScanRange;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.Comparator;
@@ -99,7 +99,8 @@ public class BrokerScanNode extends LoadScanNode {
     private Table targetTable;
     private BrokerDesc brokerDesc;
     private List<BrokerFileGroup> fileGroups;
-    private boolean strictMode = true;
+    private boolean strictMode = false;
+    private int loadParallelism = 1;
 
     private List<List<TBrokerFileStatus>> fileStatusesList;
     // file num
@@ -177,13 +178,15 @@ public class BrokerScanNode extends LoadScanNode {
                             Table targetTable,
                             BrokerDesc brokerDesc,
                             List<BrokerFileGroup> fileGroups,
-                            boolean strictMode) {
+                            boolean strictMode,
+                            int loadParallelism) {
         this.loadJobId = loadJobId;
         this.txnId = txnId;
         this.targetTable = targetTable;
         this.brokerDesc = brokerDesc;
         this.fileGroups = fileGroups;
         this.strictMode = strictMode;
+        this.loadParallelism = loadParallelism;
     }
 
     // Called from init, construct source tuple information
@@ -340,7 +343,8 @@ public class BrokerScanNode extends LoadScanNode {
         numInstances = 1;
         if (!brokerDesc.isMultiLoadBroker()) {
             numInstances = (int) (totalBytes / Config.min_bytes_per_broker_scanner);
-            numInstances = Math.min(backends.size(), numInstances);
+            int totalLoadParallelism = loadParallelism * backends.size();
+            numInstances = Math.min(totalLoadParallelism, numInstances);
             numInstances = Math.min(numInstances, Config.max_broker_concurrency);
             numInstances = Math.max(1, numInstances);
         }
@@ -351,6 +355,7 @@ public class BrokerScanNode extends LoadScanNode {
             throw new UserException(
                     "Scan bytes per broker scanner exceed limit: " + Config.max_bytes_per_broker_scanner);
         }
+        LOG.info("number instance of broker scan node is: {}, bytes per instance: {}", numInstances, bytesPerInstance);
     }
 
     private void assignBackends() throws UserException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
index 83446c8..f786cf3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java
@@ -51,7 +51,6 @@ public abstract class LoadScanNode extends ScanNode {
 
     protected Expr deleteCondition;
     protected LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND;
-    protected int numInstances;
 
     public LoadScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
         super(id, desc, planNodeName);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index b859995..1fa6acd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -150,6 +150,12 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         }
     }
 
+    // Manually set parallel exec number
+    // Currently for broker load
+    public void setParallelExecNum(int parallelExecNum) {
+        this.parallelExecNum = parallelExecNum;
+    }
+
     public void setOutputExprs(List<Expr> outputExprs) {
         this.outputExprs = Expr.cloneList(outputExprs, null);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 1f1a2ec..705299a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -80,7 +80,6 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.load.EtlJobType;
-import org.apache.doris.load.Load;
 
 public class DdlExecutor {
     public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception {
@@ -129,7 +128,7 @@ public class DdlExecutor {
                 throw new DdlException("Load job by hadoop cluster is disabled."
                         + " Try using broker load. See 'help broker load;'");
             }
-            if (loadStmt.getVersion().equals(Load.VERSION) || jobType == EtlJobType.HADOOP) {
+            if (jobType == EtlJobType.HADOOP) {
                 catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis());
             } else {
                 catalog.getLoadManager().createLoadJobFromStmt(loadStmt);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index c23cf95..894ed2f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -17,6 +17,12 @@
 
 package org.apache.doris.load.loadv2;
 
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.DataDescription;
@@ -49,19 +55,13 @@ import org.apache.doris.task.MasterTaskExecutor;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionState;
 
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -361,7 +361,7 @@ public class BrokerLoadJobTest {
         TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
         RuntimeProfile jobProfile = new RuntimeProfile("test");
         LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups,
-                100, 100, false, 100, callback, "", 100,
+                100, 100, false, 100, callback, "", 100, 1,
                 jobProfile);
         try {
             UserIdentity userInfo = new UserIdentity("root", "localhost");
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index ad981d7..c25cd6d 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.doris.load.loadv2;
 
-import com.google.common.collect.Lists;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
@@ -37,6 +36,7 @@ import org.apache.doris.transaction.BeginTransactionException;
 import org.apache.doris.transaction.GlobalTransactionMgr;
 import org.apache.doris.transaction.TransactionState;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import org.junit.Assert;
@@ -99,10 +99,10 @@ public class LoadJobTest {
         LoadJob loadJob = new BrokerLoadJob();
         try {
             loadJob.setJobProperties(jobProperties);
-            Assert.assertEquals(1000, (long) Deencapsulation.getField(loadJob, "timeoutSecond"));
-            Assert.assertEquals(0.1, Deencapsulation.getField(loadJob, "maxFilterRatio"), 0);
-            Assert.assertEquals(1024, (long) Deencapsulation.getField(loadJob, "execMemLimit"));
-            Assert.assertTrue(Deencapsulation.getField(loadJob, "strictMode"));
+            Assert.assertEquals(1000, loadJob.getTimeout());
+            Assert.assertEquals(0.1, loadJob.getMaxFilterRatio(), 0);
+            Assert.assertEquals(1024, loadJob.getExecMemLimit());
+            Assert.assertTrue(loadJob.isStrictMode());
         } catch (DdlException e) {
             Assert.fail(e.getMessage());
         }
@@ -155,8 +155,7 @@ public class LoadJobTest {
     @Test
     public void testProcessTimeoutWithLongTimeoutSecond() {
         LoadJob loadJob = new BrokerLoadJob();
-        Deencapsulation.setField(loadJob, "timeoutSecond", 1000L);
-
+        loadJob.setTimeout(1000L);
         loadJob.processTimeout();
         Assert.assertEquals(JobState.PENDING, loadJob.getState());
     }
@@ -164,7 +163,7 @@ public class LoadJobTest {
     @Test
     public void testProcessTimeout(@Mocked Catalog catalog, @Mocked EditLog editLog) {
         LoadJob loadJob = new BrokerLoadJob();
-        Deencapsulation.setField(loadJob, "timeoutSecond", 0);
+        loadJob.setTimeout(0);
         new Expectations() {
             {
                 catalog.getEditLog();
@@ -206,3 +205,4 @@ public class LoadJobTest {
         Assert.assertEquals(0, loadJob.idToTasks.size());
     }
 }
+
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
index 80f6129..f6a4e0c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java
@@ -30,14 +30,14 @@ import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.meta.MetaContext;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -128,7 +128,7 @@ public class LoadManagerTest {
                 result = "tablename";
                 Catalog.getCurrentCatalogJournalVersion();
                 minTimes = 0;
-                result = FeMetaVersion.VERSION_56;
+                result = FeMetaVersion.VERSION_CURRENT;
             }
         };
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index b975b2e..957fdb9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -252,7 +252,7 @@ public class SparkLoadJobTest {
         sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999");
         SparkLoadJob job = new SparkLoadJob(dbId, label, null, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0"));
         job.state = JobState.ETL;
-        job.maxFilterRatio = 0.15;
+        job.setMaxFilterRatio(0.15);
         job.transactionId = transactionId;
         Deencapsulation.setField(job, "appId", appId);
         Deencapsulation.setField(job, "etlOutputPath", etlOutputPath);
@@ -510,4 +510,4 @@ public class SparkLoadJobTest {
             file.delete();
         }
     }
-}
\ No newline at end of file
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/LoadJobV2PersistTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/LoadJobV2PersistTest.java
new file mode 100644
index 0000000..9bd0434
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/persist/LoadJobV2PersistTest.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.analysis.UserIdentity;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.loadv2.BrokerLoadJob;
+import org.apache.doris.qe.OriginStatement;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.Map;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+
+public class LoadJobV2PersistTest {
+    private BrokerLoadJob createJob() throws Exception {
+        String loadStmt = "";
+        BrokerDesc brokerDesc = new BrokerDesc("bos", Maps.newHashMap());
+        OriginStatement originStatement = new OriginStatement(loadStmt, 0);
+        BrokerLoadJob brokerLoadJob = new BrokerLoadJob(1L, "label", brokerDesc, originStatement,
+                UserIdentity.ADMIN);
+        Map<String, String> jobProperties = Maps.newHashMap();
+        jobProperties.put(LoadStmt.LOAD_PARALLELISM, "5");
+        brokerLoadJob.setJobProperties(jobProperties);
+        return brokerLoadJob;
+    }
+
+    @Test
+    public void testBrokerLoadJob(@Mocked Catalog catalog,
+                                  @Injectable Database database,
+                                  @Injectable Table table) throws Exception {
+
+        new Expectations() {
+            {
+                catalog.getDb(anyLong);
+                minTimes = 0;
+                result = database;
+                database.getTable(anyLong);
+                minTimes = 0;
+                result = table;
+                table.getName();
+                minTimes = 0;
+                result = "tablename";
+                Catalog.getCurrentCatalogJournalVersion();
+                minTimes = 0;
+                result = FeMetaVersion.VERSION_CURRENT;
+            }
+        };
+
+        // 1. Write objects to file
+        File file = new File("./testBrokerLoadJob");
+        file.createNewFile();
+        DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
+
+        BrokerLoadJob job = createJob();
+        Assert.assertEquals(5, job.getLoadParallelism());
+        job.write(dos);
+
+        dos.flush();
+        dos.close();
+
+        // 2. Read objects from file
+        DataInputStream dis = new DataInputStream(new FileInputStream(file));
+
+        BrokerLoadJob rJob = (BrokerLoadJob)  BrokerLoadJob.read(dis);
+        Assert.assertEquals(5, rJob.getLoadParallelism());
+        Assert.assertEquals(EtlJobType.BROKER, rJob.getJobType());
+
+        // 3. delete files
+        dis.close();
+        file.delete();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org