You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "wsjz (via GitHub)" <gi...@apache.org> on 2023/11/03 02:58:23 UTC

[PR] [feature](Load)(step2)support nereids load job schedule [doris]

wsjz opened a new pull request, #26356:
URL: https://github.com/apache/doris/pull/26356

   ## Proposed changes
   
   We will  Integrate new load job manager into  new job scheduling framework so that the insert into task can be scheduled after the broker load sql  is converted to insert into TVF(table value function) sql.
   
   
   
   issue: https://github.com/apache/doris/issues/24221
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1867246267

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427678062


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -74,30 +72,70 @@ public class InsertTask extends AbstractTask {
     }
 
     private String labelName;

Review Comment:
   now use for getTvfInfo()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434217677


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+    private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    /**
+     * get jobs with label
+     * @param db db
+     * @return jobs
+     * @throws JobException e
+     */
+    public List<InsertJob> getJobs(Database db) throws JobException {
+        readLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+            if (labelToLoadJobs == null) {
+                throw new JobException("Load job does not exist");
+            }
+            return labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+        } finally {
+            readUnlock();
+        }
+    }
+
+    /**
+     * add job with label
+     *
+     * @param job job with label
+     * @throws LabelAlreadyUsedException e
+     */
+    public void addJob(InsertJob job) throws LabelAlreadyUsedException {
+        writeLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs;
+            if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+                labelToLoadJobs = new ConcurrentHashMap<>();
+                dbIdToLabelToLoadJobs.put(job.getDbId(), labelToLoadJobs);
+            }
+            labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+            if (labelToLoadJobs.containsKey(job.getLabelName())) {
+                throw new LabelAlreadyUsedException(job.getLabelName());
+            } else {
+                labelToLoadJobs.put(job.getLabelName(), new ArrayList<>());
+            }
+            labelToLoadJobs.get(job.getLabelName()).add(job);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    /**
+     * support remove label job
+     * @param dbId db id
+     * @param labelName label name
+     */
+    public void removeJob(long dbId, String labelName) {

Review Comment:
   TODO: for clean old label method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "morningman (via GitHub)" <gi...@apache.org>.
morningman merged PR #26356:
URL: https://github.com/apache/doris/pull/26356


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859747929

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859955145

   (From new machine)TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 43.64 seconds
    stream load tsv:          579 seconds loaded 74807831229 Bytes, about 123 MB/s
    stream load json:         19 seconds loaded 2358488459 Bytes, about 118 MB/s
    stream load orc:          66 seconds loaded 1101869774 Bytes, about 15 MB/s
    stream load parquet:          32 seconds loaded 861443392 Bytes, about 25 MB/s
    insert into select:          28.6 seconds inserted 10000000 Rows, about 349K ops/s
    storage size: 17219926919 Bytes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859653214

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1858708546

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859698163

   (From new machine)TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 44.84 seconds
    stream load tsv:          576 seconds loaded 74807831229 Bytes, about 123 MB/s
    stream load json:         19 seconds loaded 2358488459 Bytes, about 118 MB/s
    stream load orc:          66 seconds loaded 1101869774 Bytes, about 15 MB/s
    stream load parquet:          32 seconds loaded 861443392 Bytes, about 25 MB/s
    insert into select:          28.9 seconds inserted 10000000 Rows, about 346K ops/s
    storage size: 17216358140 Bytes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859853077

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859544368

   (From new machine)TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 44.14 seconds
    stream load tsv:          575 seconds loaded 74807831229 Bytes, about 124 MB/s
    stream load json:         19 seconds loaded 2358488459 Bytes, about 118 MB/s
    stream load orc:          66 seconds loaded 1101869774 Bytes, about 15 MB/s
    stream load parquet:          32 seconds loaded 861443392 Bytes, about 25 MB/s
    insert into select:          28.4 seconds inserted 10000000 Rows, about 352K ops/s
    storage size: 17220998870 Bytes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1860194357

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859845959

   
   TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   ```
   Tpch sf100 test result on commit bfaf47d2d8d5afd2139a72fa8158f34d4871a719, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	4731	4468	4460	4460
   q2	368	152	158	152
   q3	1484	1270	1242	1242
   q4	1123	904	880	880
   q5	3124	3174	3183	3174
   q6	244	127	130	127
   q7	979	490	493	490
   q8	2195	2223	2194	2194
   q9	6722	6681	6666	6666
   q10	3233	3255	3274	3255
   q11	320	207	212	207
   q12	351	211	209	209
   q13	4541	3807	3809	3807
   q14	240	207	219	207
   q15	565	523	527	523
   q16	443	378	385	378
   q17	1025	569	460	460
   q18	7161	6984	6918	6918
   q19	1511	1446	1373	1373
   q20	554	320	283	283
   q21	3073	2643	2731	2643
   q22	348	287	282	282
   Total cold run time: 44335 ms
   Total hot run time: 39930 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4413	4392	4371	4371
   q2	269	160	172	160
   q3	3561	3532	3523	3523
   q4	2392	2381	2375	2375
   q5	5735	5745	5754	5745
   q6	244	121	125	121
   q7	2413	1865	1858	1858
   q8	3537	3513	3523	3513
   q9	9041	9067	9004	9004
   q10	3887	3980	4005	3980
   q11	512	392	379	379
   q12	758	611	592	592
   q13	4302	3585	3543	3543
   q14	292	260	248	248
   q15	565	517	515	515
   q16	501	473	471	471
   q17	1888	1864	1880	1864
   q18	8666	8190	8289	8190
   q19	1717	1782	1783	1782
   q20	2269	1966	1934	1934
   q21	6545	6218	6166	6166
   q22	484	442	424	424
   Total cold run time: 63991 ms
   Total hot run time: 60758 ms
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859836459

   (From new machine)TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 43.98 seconds
    stream load tsv:          575 seconds loaded 74807831229 Bytes, about 124 MB/s
    stream load json:         19 seconds loaded 2358488459 Bytes, about 118 MB/s
    stream load orc:          66 seconds loaded 1101869774 Bytes, about 15 MB/s
    stream load parquet:          32 seconds loaded 861443392 Bytes, about 25 MB/s
    insert into select:          28.7 seconds inserted 10000000 Rows, about 348K ops/s
    storage size: 17219842626 Bytes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859975066

   
   TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   ```
   Tpch sf100 test result on commit 1a3e98ff0a58621903f0c6728327143ef13ada75, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	4704	4455	4508	4455
   q2	368	148	159	148
   q3	1450	1235	1279	1235
   q4	1112	884	899	884
   q5	3131	3136	3132	3132
   q6	243	125	127	125
   q7	998	483	487	483
   q8	2189	2234	2203	2203
   q9	6691	6657	6688	6657
   q10	3220	3278	3257	3257
   q11	315	206	201	201
   q12	349	208	206	206
   q13	4555	3762	3792	3762
   q14	247	217	208	208
   q15	565	520	522	520
   q16	438	381	379	379
   q17	1016	607	537	537
   q18	7236	6874	6933	6874
   q19	1530	1479	1388	1388
   q20	546	311	319	311
   q21	3054	2676	2683	2676
   q22	347	279	282	279
   Total cold run time: 44304 ms
   Total hot run time: 39920 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4431	4432	4393	4393
   q2	264	164	170	164
   q3	3538	3525	3525	3525
   q4	2379	2371	2352	2352
   q5	5736	5723	5734	5723
   q6	241	121	123	121
   q7	2384	1903	1867	1867
   q8	3528	3529	3523	3523
   q9	9005	8989	8956	8956
   q10	3887	3997	3997	3997
   q11	499	384	383	383
   q12	767	600	599	599
   q13	4281	3586	3547	3547
   q14	280	258	245	245
   q15	567	522	517	517
   q16	499	442	467	442
   q17	1883	1852	1846	1846
   q18	8636	8408	8355	8355
   q19	1726	1809	1794	1794
   q20	2267	1952	1944	1944
   q21	6534	6204	6179	6179
   q22	496	430	435	430
   Total cold run time: 63828 ms
   Total hot run time: 60902 ms
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1862073182

   (From new machine)TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 44.63 seconds
    stream load tsv:          581 seconds loaded 74807831229 Bytes, about 122 MB/s
    stream load json:         19 seconds loaded 2358488459 Bytes, about 118 MB/s
    stream load orc:          66 seconds loaded 1101869774 Bytes, about 15 MB/s
    stream load parquet:          33 seconds loaded 861443392 Bytes, about 24 MB/s
    insert into select:          29.2 seconds inserted 10000000 Rows, about 342K ops/s
    storage size: 17221068688 Bytes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868817492

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859385614

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427703299


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr {
+    private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+    private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+    private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+    // lock for export job
+    // lock is private and must use after db lock
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    private JobManager<InsertJob, ?> getJobManager() {
+        return Env.getCurrentEnv().getJobManager();
+    }
+
+    /**
+     * add load job and add tasks
+     * @param loadJob job
+     */
+    public void addLoadJob(InsertJob loadJob) throws DdlException {
+        writeLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(loadJob.getDbId());
+            if (labelToLoadJobs != null && labelToLoadJobs.containsKey(loadJob.getLabel())) {
+                throw new LabelAlreadyUsedException(loadJob.getLabel());
+            }
+            unprotectAddJob(loadJob);
+        } catch (LabelAlreadyUsedException e) {
+            throw new RuntimeException(e);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void unprotectAddJob(InsertJob job) throws DdlException {
+        loadIdToJob.put(job.getJobId(), job);
+        try {
+            getJobManager().registerJob(job);
+            if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+                dbIdToLabelToLoadJobs.put(job.getDbId(), new ConcurrentHashMap<>());
+            }
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+            if (!labelToLoadJobs.containsKey(job.getLabel())) {
+                labelToLoadJobs.put(job.getLabel(), new ArrayList<>());
+            }
+            labelToLoadJobs.get(job.getLabel()).add(job);
+        } catch (org.apache.doris.job.exception.JobException e) {
+            throw new DdlException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * replay load job
+     * @param replayLoadLog load log
+     * @throws DdlException ex
+     */
+    public void replayLoadJob(ReplayLoadLog replayLoadLog) throws DdlException {
+        writeLock();
+        try {
+            if (replayLoadLog instanceof ReplayLoadLog.ReplayCreateLoadLog) {
+                InsertJob loadJob = new InsertJob((ReplayLoadLog.ReplayCreateLoadLog) replayLoadLog);
+                JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
+                jobConfig.setExecuteType(JobExecuteType.INSTANT);
+                loadJob.setJobConfig(jobConfig);
+                addLoadJob(loadJob);
+                LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getJobId()).add("msg", "replay create load job")
+                        .build());
+            } else if (replayLoadLog instanceof ReplayLoadLog.ReplayEndLoadLog) {
+                InsertJob job = loadIdToJob.get(replayLoadLog.getId());
+                if (job == null) {
+                    // This should not happen.
+                    // Last time I found that when user submit a job with already used label, an END_LOAD_JOB edit log
+                    // will be written but the job is not added to 'idToLoadJob', so this job here we got will be null.
+                    // And this bug has been fixed.
+                    // Just add a log here to observe.
+                    LOG.warn("job does not exist when replaying end load job edit log: {}", replayLoadLog);
+                    return;
+                }
+                job.unprotectReadEndOperation((ReplayLoadLog.ReplayEndLoadLog) replayLoadLog);
+                LOG.info(new LogBuilder(LogKey.LOAD_JOB, replayLoadLog.getId()).add("operation", replayLoadLog)
+                        .add("msg", "replay end load job").build());
+            } else {
+                throw new DdlException("Unsupported replay job type. ");
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    //    public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+    //                                  long scannedBytes, boolean isDone) {
+    //        LoadJobExecutor job = loadIdToJob.get(jobId);
+    //        if (job != null) {
+    //            job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
+    //        }
+    //    }
+
+    /**
+     * cancel job
+     *
+     * @param dbName   dbName
+     * @param label    job label
+     * @param state    job state
+     * @param operator filter operator, like or equals
+     */
+    public void cancelLoadJob(String dbName, String label, String state, CompoundPredicate.Operator operator)
+            throws JobException, AnalysisException, DdlException {
+        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+        // List of load jobs waiting to be cancelled
+        List<InsertJob> uncompletedLoadJob;
+        readLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+            if (labelToLoadJobs == null) {
+                throw new JobException("Load job does not exist");
+            }
+            List<InsertJob> matchLoadJobs = Lists.newArrayList();
+            addNeedCancelLoadJob(label, state, operator,
+                    labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+                    matchLoadJobs);
+            if (matchLoadJobs.isEmpty()) {
+                throw new JobException("Load job does not exist");
+            }
+            // check state here
+            uncompletedLoadJob =
+                    matchLoadJobs.stream().filter(InsertJob::isRunning)
+                            .collect(Collectors.toList());
+            if (uncompletedLoadJob.isEmpty()) {
+                throw new JobException("There is no uncompleted job");
+            }
+        } finally {
+            readUnlock();
+        }
+        for (InsertJob loadJob : uncompletedLoadJob) {
+            try {
+                loadJob.cancelJob();
+            } catch (JobException e) {
+                LOG.warn("Fail to cancel job, its label: {}", loadJob.getLabel());
+            }
+        }
+    }
+
+    private static void addNeedCancelLoadJob(String label, String state,

Review Comment:
   not required and I suggest make a method static if it's not use any instance  fields for refactoring friendly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1858727499

   (From new machine)TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 45.22 seconds
    stream load tsv:          589 seconds loaded 74807831229 Bytes, about 121 MB/s
    stream load json:         19 seconds loaded 2358488459 Bytes, about 118 MB/s
    stream load orc:          67 seconds loaded 1101869774 Bytes, about 15 MB/s
    stream load parquet:          33 seconds loaded 861443392 Bytes, about 24 MB/s
    insert into select:          28.9 seconds inserted 10000000 Rows, about 346K ops/s
    storage size: 17219958200 Bytes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427678969


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -74,30 +72,70 @@ public class InsertTask extends AbstractTask {
     }
 
     private String labelName;
-
     private InsertIntoTableCommand command;
-
     private StmtExecutor stmtExecutor;
-
     private ConnectContext ctx;
-
     private String sql;
-
     private String currentDb;
-
     private UserIdentity userIdentity;
-
+    private LoadStatistic loadStatistic;
     private AtomicBoolean isCanceled = new AtomicBoolean(false);
-
     private AtomicBoolean isFinished = new AtomicBoolean(false);
-
     private static final String LABEL_SPLITTER = "_";
 
+    private FailMsg failMsg;
+    @Getter
+    private String trackingUrl;
 
     @Getter
     @Setter
-    private LoadJob loadJob;
+    private InsertJob jobInfo;
+    private TaskType taskType = TaskType.PENDING;
+    private MergeType mergeType = MergeType.APPEND;
+
+    /**
+     * task merge type
+     */
+    enum MergeType {
+        MERGE,
+        APPEND,
+        DELETE
+    }
+
+    /**
+     * task type
+     */
+    enum TaskType {
+        UNKNOWN, // this is only for ISSUE #2354
+        PENDING,
+        LOADING,
+        FINISHED,
+        FAILED,
+        CANCELLED
+    }
+
+    public InsertTask(InsertIntoTableCommand insertInto,
+                      ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
+        this(null, insertInto, ctx, executor, statistic);
+    }
+
+    public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) {
+        this.labelName = labelName;
+        this.sql = sql;
+        this.currentDb = currentDb;
+        this.userIdentity = userIdentity;
+        setTaskId(Env.getCurrentEnv().getNextId());
+    }
 
+    public InsertTask(String labelName, InsertIntoTableCommand insertInto,
+                       ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
+        this.labelName = labelName;

Review Comment:
   the first constructor don't need loadStatistic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1858737446

   
   <details>
   <summary>TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'</summary>
   
   ```
   Tpch sf100 test result on commit 09fcd7c63648ab365f94b9859fccc7812fb09b03, data reload: true
   
   run tpch-sf100 query with default conf and session variables
   q1	4650	4427	4413	4413
   q2	381	120	118	118
   q3	1456	1221	1204	1204
   q4	1153	911	866	866
   q5	3217	3228	3206	3206
   q6	244	121	123	121
   q7	964	526	542	526
   q8	2155	2180	2189	2180
   q9	7176	6692	6659	6659
   q10	3219	3271	3265	3265
   q11	324	202	201	201
   q12	359	214	215	214
   q13	4936	3851	3823	3823
   q14	237	203	207	203
   q15	566	507	518	507
   q16	461	381	381	381
   q17	1016	609	580	580
   q18	7407	7035	6731	6731
   q19	1555	1400	1451	1400
   q20	501	284	290	284
   q21	3059	2629	2659	2629
   q22	344	271	277	271
   Total cold run time: 45380 ms
   Total hot run time: 39782 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4422	4397	4424	4397
   q2	263	160	165	160
   q3	3508	3492	3489	3489
   q4	2391	2393	2391	2391
   q5	5708	5697	5708	5697
   q6	237	121	122	121
   q7	2375	1851	1842	1842
   q8	3519	3523	3518	3518
   q9	9284	9027	9161	9027
   q10	3917	4005	4023	4005
   q11	504	385	394	385
   q12	768	602	611	602
   q13	4309	3572	3571	3571
   q14	289	246	254	246
   q15	585	528	526	526
   q16	511	463	496	463
   q17	1868	1866	1836	1836
   q18	8517	8097	8337	8097
   q19	1714	1765	1721	1721
   q20	2274	1969	1937	1937
   q21	6508	6240	6111	6111
   q22	501	427	429	427
   Total cold run time: 63972 ms
   Total hot run time: 60569 ms
   ```
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "morningman (via GitHub)" <gi...@apache.org>.
morningman commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1424224847


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.DataTransFormMgr;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr extends DataTransFormMgr {
+    private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+    private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+    private Map<String, Long> labelToLoadJobId = new HashMap<>();
+    private Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+    private JobManager<InsertJob, ?> getJobManager() {
+        return Env.getCurrentEnv().getJobManager();
+    }
+
+    /**
+     * add load job and add tasks
+     * @param loadJob job
+     */
+    public void addLoadJob(InsertJob loadJob) throws DdlException {
+        writeLock();
+        try {
+            if (labelToLoadJobId.containsKey(loadJob.getLabel())) {

Review Comment:
   Label is unique within a database.
   Should use `dbIdToLabelToLoadJobs`



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -75,55 +118,147 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
         COLUMN_TO_INDEX = builder.build();
     }
 
-    @SerializedName(value = "lp")
-    String labelPrefix;
+    @SerializedName("taskIdList")
+    ConcurrentLinkedQueue<Long> taskIdList;
 
-    InsertIntoTableCommand command;
+    private final long dbId;
+    private String labelName;
+    private List<InsertIntoTableCommand> plans;
+    private InsertJob.LoadType loadType;
+    // 0: the job status is pending
+    // n/100: n is the number of task which has been finished
+    // 99: all tasks have been finished
+    // 100: txn status is visible and load has been finished
+    private int progress;
+    private long createTimestamp = System.currentTimeMillis();
+    private FailMsg failMsg;
+    private LoadStatistic loadStatistic = new LoadStatistic();
+    private Set<Long> finishedTaskIds = new HashSet<>();
+    private Set<String> tableNames;
+    private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+    private Map<String, String> properties;
+    private AuthorizationInfo authorizationInfo;
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private ConnectContext ctx;
+    private StmtExecutor stmtExecutor;
+    private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+    private List<TabletCommitInfo> commitInfos = new ArrayList<>();
 
-    StmtExecutor stmtExecutor;
+    // max save task num, do we need to config it?
+    private static final int MAX_SAVE_TASK_NUM = 100;
 
-    ConnectContext ctx;
+    /**
+     * load job type
+     */
+    public enum LoadType {
+        BULK,
+        SPARK,
+        LOCAL_FILE,
+        UNKNOWN
 
-    @SerializedName("tis")
-    ConcurrentLinkedQueue<Long> taskIdList;
+    }
 
-    // max save task num, do we need to config it?
-    private static final int MAX_SAVE_TASK_NUM = 100;
+    public InsertJob(ReplayLoadLog.ReplayCreateLoadLog replayLoadLog) {
+        super(replayLoadLog.getId());
+        setJobId(replayLoadLog.getId());
+        this.dbId = replayLoadLog.getDbId();
+    }
+
+    public InsertJob(Long jobId, String jobName,
+                     JobStatus jobStatus,
+                     String currentDbName,
+                     String comment,
+                     UserIdentity createUser,
+                     JobExecutionConfiguration jobConfig,
+                     Long createTimeMs,
+                     String executeSql) {
+        super(jobId, jobName, jobStatus, currentDbName, comment, createUser,

Review Comment:
   You use `label` as job's name.
   But job's name need to be global unique.
   So I think job name should be `dbid+label+uuid`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.DataTransFormMgr;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr extends DataTransFormMgr {
+    private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+    private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+    private Map<String, Long> labelToLoadJobId = new HashMap<>();
+    private Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+    private JobManager<InsertJob, ?> getJobManager() {
+        return Env.getCurrentEnv().getJobManager();
+    }
+
+    /**
+     * add load job and add tasks
+     * @param loadJob job
+     */
+    public void addLoadJob(InsertJob loadJob) throws DdlException {
+        writeLock();
+        try {
+            if (labelToLoadJobId.containsKey(loadJob.getLabel())) {
+                throw new LabelAlreadyUsedException(loadJob.getLabel());
+            }
+            unprotectAddJob(loadJob);
+            Env.getCurrentEnv().getEditLog().logLoadCreate(ReplayLoadLog.logCreateLoadOperation(loadJob));

Review Comment:
   In `unprotectAddJob->registerJob()`, there is an edit log.
   And here is another editlog.
   I think we should merge these 2 logs



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -75,55 +118,147 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
         COLUMN_TO_INDEX = builder.build();
     }
 
-    @SerializedName(value = "lp")
-    String labelPrefix;
+    @SerializedName("taskIdList")
+    ConcurrentLinkedQueue<Long> taskIdList;
 
-    InsertIntoTableCommand command;
+    private final long dbId;
+    private String labelName;
+    private List<InsertIntoTableCommand> plans;
+    private InsertJob.LoadType loadType;
+    // 0: the job status is pending
+    // n/100: n is the number of task which has been finished
+    // 99: all tasks have been finished
+    // 100: txn status is visible and load has been finished
+    private int progress;
+    private long createTimestamp = System.currentTimeMillis();
+    private FailMsg failMsg;
+    private LoadStatistic loadStatistic = new LoadStatistic();
+    private Set<Long> finishedTaskIds = new HashSet<>();
+    private Set<String> tableNames;
+    private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+    private Map<String, String> properties;
+    private AuthorizationInfo authorizationInfo;
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private ConnectContext ctx;
+    private StmtExecutor stmtExecutor;
+    private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+    private List<TabletCommitInfo> commitInfos = new ArrayList<>();
 
-    StmtExecutor stmtExecutor;
+    // max save task num, do we need to config it?
+    private static final int MAX_SAVE_TASK_NUM = 100;
 
-    ConnectContext ctx;
+    /**
+     * load job type
+     */
+    public enum LoadType {
+        BULK,
+        SPARK,
+        LOCAL_FILE,
+        UNKNOWN
 
-    @SerializedName("tis")
-    ConcurrentLinkedQueue<Long> taskIdList;
+    }
 
-    // max save task num, do we need to config it?
-    private static final int MAX_SAVE_TASK_NUM = 100;
+    public InsertJob(ReplayLoadLog.ReplayCreateLoadLog replayLoadLog) {
+        super(replayLoadLog.getId());
+        setJobId(replayLoadLog.getId());
+        this.dbId = replayLoadLog.getDbId();
+    }
+
+    public InsertJob(Long jobId, String jobName,
+                     JobStatus jobStatus,
+                     String currentDbName,
+                     String comment,
+                     UserIdentity createUser,
+                     JobExecutionConfiguration jobConfig,
+                     Long createTimeMs,
+                     String executeSql) {
+        super(jobId, jobName, jobStatus, currentDbName, comment, createUser,
+                jobConfig, createTimeMs, executeSql, null);
+        this.dbId = ConnectContext.get().getCurrentDbId();
+    }
+
+    public InsertJob(ConnectContext ctx,
+                      StmtExecutor executor,
+                      String labelName,
+                      List<InsertIntoTableCommand> plans,
+                      Set<String> sinkTableNames,
+                      Map<String, String> properties,
+                      String comment,
+                      JobExecutionConfiguration jobConfig) {
+        super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null,
+                comment, ctx.getCurrentUserIdentity(), jobConfig);
+        this.ctx = ctx;
+        this.plans = plans;
+        this.stmtExecutor = executor;
+        this.dbId = ctx.getCurrentDbId();
+        this.labelName = labelName;
+        this.tableNames = sinkTableNames;
+        this.properties = properties;
+        // TODO: not support other type yet
+        this.loadType = InsertJob.LoadType.BULK;
+    }
 
     @Override
     public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
-        //nothing need to do in insert job
-        InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
-        task.setJobId(getJobId());
-        task.setTaskType(taskType);
-        task.setTaskId(Env.getCurrentEnv().getNextId());
+        if (plans.isEmpty()) {
+            InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
+            task.setJobId(getJobId());
+            task.setTaskType(taskType);
+            task.setTaskId(Env.getCurrentEnv().getNextId());
+            ArrayList<InsertTask> tasks = new ArrayList<>();
+            tasks.add(task);
+            super.initTasks(tasks);
+            addNewTask(task.getTaskId());
+            return tasks;
+        } else {
+            return createBatchTasks(taskType);
+        }
+    }
+
+    private List<InsertTask> createBatchTasks(TaskType taskType) {
         ArrayList<InsertTask> tasks = new ArrayList<>();
-        tasks.add(task);
-        super.initTasks(tasks);
-        addNewTask(task.getTaskId());
-        return tasks;
+        for (InsertIntoTableCommand logicalPlan : plans) {
+            InsertTask task = new InsertTask(logicalPlan, ctx, stmtExecutor, loadStatistic);
+            task.setJobId(getJobId());
+            task.setTaskType(taskType);
+            idToTasks.put(task.getTaskId(), task);
+            initTasks(tasks);
+        }
+        return new ArrayList<>(idToTasks.values());
     }
 
     public void addNewTask(long id) {

Review Comment:
   ```suggestion
       private void addNewTask(long id) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java:
##########
@@ -101,6 +105,14 @@ public void setJobId(long jobId) {
 
     @Override
     public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
+        runInternal(ctx, executor);
+    }
+
+    public void statefulRun(ConnectContext ctx, StmtExecutor executor) throws Exception {

Review Comment:
   Looks like `statefulRun()` is same as `run()`?



##########
fe/fe-core/src/main/java/org/apache/doris/load/DataTransFormMgr.java:
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public abstract class DataTransFormMgr {

Review Comment:
   What is this class for?



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -75,55 +118,147 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
         COLUMN_TO_INDEX = builder.build();
     }
 
-    @SerializedName(value = "lp")
-    String labelPrefix;
+    @SerializedName("taskIdList")
+    ConcurrentLinkedQueue<Long> taskIdList;
 
-    InsertIntoTableCommand command;
+    private final long dbId;
+    private String labelName;
+    private List<InsertIntoTableCommand> plans;
+    private InsertJob.LoadType loadType;
+    // 0: the job status is pending
+    // n/100: n is the number of task which has been finished
+    // 99: all tasks have been finished
+    // 100: txn status is visible and load has been finished
+    private int progress;
+    private long createTimestamp = System.currentTimeMillis();
+    private FailMsg failMsg;
+    private LoadStatistic loadStatistic = new LoadStatistic();
+    private Set<Long> finishedTaskIds = new HashSet<>();
+    private Set<String> tableNames;
+    private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+    private Map<String, String> properties;
+    private AuthorizationInfo authorizationInfo;
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private ConnectContext ctx;
+    private StmtExecutor stmtExecutor;
+    private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+    private List<TabletCommitInfo> commitInfos = new ArrayList<>();
 
-    StmtExecutor stmtExecutor;
+    // max save task num, do we need to config it?
+    private static final int MAX_SAVE_TASK_NUM = 100;
 
-    ConnectContext ctx;
+    /**
+     * load job type
+     */
+    public enum LoadType {
+        BULK,
+        SPARK,
+        LOCAL_FILE,
+        UNKNOWN
 
-    @SerializedName("tis")
-    ConcurrentLinkedQueue<Long> taskIdList;
+    }
 
-    // max save task num, do we need to config it?
-    private static final int MAX_SAVE_TASK_NUM = 100;
+    public InsertJob(ReplayLoadLog.ReplayCreateLoadLog replayLoadLog) {
+        super(replayLoadLog.getId());
+        setJobId(replayLoadLog.getId());
+        this.dbId = replayLoadLog.getDbId();
+    }
+
+    public InsertJob(Long jobId, String jobName,
+                     JobStatus jobStatus,
+                     String currentDbName,
+                     String comment,
+                     UserIdentity createUser,
+                     JobExecutionConfiguration jobConfig,
+                     Long createTimeMs,
+                     String executeSql) {
+        super(jobId, jobName, jobStatus, currentDbName, comment, createUser,
+                jobConfig, createTimeMs, executeSql, null);
+        this.dbId = ConnectContext.get().getCurrentDbId();
+    }
+
+    public InsertJob(ConnectContext ctx,
+                      StmtExecutor executor,
+                      String labelName,
+                      List<InsertIntoTableCommand> plans,
+                      Set<String> sinkTableNames,
+                      Map<String, String> properties,
+                      String comment,
+                      JobExecutionConfiguration jobConfig) {
+        super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null,
+                comment, ctx.getCurrentUserIdentity(), jobConfig);
+        this.ctx = ctx;
+        this.plans = plans;
+        this.stmtExecutor = executor;
+        this.dbId = ctx.getCurrentDbId();
+        this.labelName = labelName;
+        this.tableNames = sinkTableNames;
+        this.properties = properties;
+        // TODO: not support other type yet
+        this.loadType = InsertJob.LoadType.BULK;
+    }
 
     @Override
     public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
-        //nothing need to do in insert job
-        InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
-        task.setJobId(getJobId());
-        task.setTaskType(taskType);
-        task.setTaskId(Env.getCurrentEnv().getNextId());
+        if (plans.isEmpty()) {
+            InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
+            task.setJobId(getJobId());
+            task.setTaskType(taskType);
+            task.setTaskId(Env.getCurrentEnv().getNextId());
+            ArrayList<InsertTask> tasks = new ArrayList<>();
+            tasks.add(task);
+            super.initTasks(tasks);
+            addNewTask(task.getTaskId());
+            return tasks;
+        } else {
+            return createBatchTasks(taskType);
+        }
+    }
+
+    private List<InsertTask> createBatchTasks(TaskType taskType) {

Review Comment:
   Why not call `addNewTask()` in `createBatchTasks()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1855388785

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1855426099

   
   <details>
   <summary>TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'</summary>
   
   ```
   Tpch sf100 test result on commit c3ecbe8b5077a06b1ebdb7f378765228625ae569, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	4754	4480	4477	4477
   q2	364	152	158	152
   q3	1477	1259	1216	1216
   q4	1113	903	911	903
   q5	3141	3167	3189	3167
   q6	247	130	129	129
   q7	979	486	484	484
   q8	2218	2194	2199	2194
   q9	6682	6654	6646	6646
   q10	3219	3273	3255	3255
   q11	321	204	201	201
   q12	351	211	212	211
   q13	4557	3815	3786	3786
   q14	249	211	211	211
   q15	580	523	523	523
   q16	440	387	387	387
   q17	1008	595	528	528
   q18	7174	6858	6870	6858
   q19	1530	1378	1414	1378
   q20	520	320	292	292
   q21	3069	2627	2676	2627
   q22	350	290	292	290
   Total cold run time: 44343 ms
   Total hot run time: 39915 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4398	4423	4388	4388
   q2	268	165	168	165
   q3	3532	3535	3520	3520
   q4	2403	2386	2385	2385
   q5	5744	5719	5776	5719
   q6	240	121	125	121
   q7	2406	1917	1836	1836
   q8	3517	3525	3527	3525
   q9	9038	9037	8981	8981
   q10	3932	4003	4022	4003
   q11	506	384	379	379
   q12	769	592	594	592
   q13	4280	3564	3549	3549
   q14	284	256	260	256
   q15	574	523	522	522
   q16	502	466	461	461
   q17	1866	1863	1865	1863
   q18	8636	8267	8319	8267
   q19	1757	1747	1745	1745
   q20	2269	1968	1932	1932
   q21	6515	6179	6179	6179
   q22	494	414	428	414
   Total cold run time: 63930 ms
   Total hot run time: 60802 ms
   ```
   </details>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1858744908

   run p0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859517070

   
   TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   ```
   Tpch sf100 test result on commit db2cd09edf366d66eb55caae3f4fd6d5f0b49ea2, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	4749	4506	4497	4497
   q2	363	151	159	151
   q3	1463	1253	1198	1198
   q4	1121	925	901	901
   q5	3165	3166	3127	3127
   q6	247	128	130	128
   q7	986	486	475	475
   q8	2219	2216	2187	2187
   q9	6659	6650	6655	6650
   q10	3201	3259	3249	3249
   q11	329	203	209	203
   q12	354	214	216	214
   q13	4566	3800	3871	3800
   q14	241	218	209	209
   q15	570	537	524	524
   q16	439	392	390	390
   q17	1005	648	577	577
   q18	7218	7034	6934	6934
   q19	1552	1494	1363	1363
   q20	537	319	300	300
   q21	3040	2634	2664	2634
   q22	346	283	281	281
   Total cold run time: 44370 ms
   Total hot run time: 39992 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4490	4396	4434	4396
   q2	267	161	171	161
   q3	3549	3527	3530	3527
   q4	2392	2386	2382	2382
   q5	5721	5740	5729	5729
   q6	244	122	123	122
   q7	2369	1849	1868	1849
   q8	3520	3514	3512	3512
   q9	9035	9007	9011	9007
   q10	3904	3959	3996	3959
   q11	506	392	386	386
   q12	771	596	595	595
   q13	4317	3546	3570	3546
   q14	283	259	259	259
   q15	568	518	525	518
   q16	514	471	481	471
   q17	1861	1872	1829	1829
   q18	8648	8167	8298	8167
   q19	1724	1758	1758	1758
   q20	2254	1945	1937	1937
   q21	6563	6211	6155	6155
   q22	504	428	437	428
   Total cold run time: 64004 ms
   Total hot run time: 60693 ms
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859516166

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859649578

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427690881


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
             new Column("CreateTime", ScalarType.createStringType()),
             new Column("Comment", ScalarType.createStringType()));
 
+    private static final ShowResultSetMetaData TASK_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Label", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Status", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
+
+                    .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("User", ScalarType.createVarchar(20)))
+                    .build();
+
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
     static {
-        ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
+        ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
         for (int i = 0; i < SCHEMA.size(); i++) {
             builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
         }
         COLUMN_TO_INDEX = builder.build();
     }
 
-    @SerializedName(value = "lp")
-    String labelPrefix;
+    @SerializedName("taskIdList")
+    ConcurrentLinkedQueue<Long> taskIdList;
+    @SerializedName("dbId")
+    private final long dbId;
+    @SerializedName("labelName")
+    private String labelName;
+    @SerializedName("loadType")
+    private InsertJob.LoadType loadType;
+    // 0: the job status is pending
+    // n/100: n is the number of task which has been finished
+    // 99: all tasks have been finished
+    // 100: txn status is visible and load has been finished
+    @SerializedName("progress")
+    private int progress;
+    @SerializedName("failMsg")
+    private FailMsg failMsg;
+    @SerializedName("plans")
+    private List<InsertIntoTableCommand> plans;
+    private LoadStatistic loadStatistic = new LoadStatistic();
+    private Set<Long> finishedTaskIds = new HashSet<>();
+    private Set<String> tableNames;
+    private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+    private Map<String, String> properties;
+    private AuthorizationInfo authorizationInfo;
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private ConnectContext ctx;
+    private StmtExecutor stmtExecutor;
+    private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+    private List<TabletCommitInfo> commitInfos = new ArrayList<>();
+
+    // max save task num, do we need to config it?
+    private static final int MAX_SAVE_TASK_NUM = 100;
+
+    /**
+     * load job type
+     */
+    public enum LoadType {
+        BULK,
+        SPARK,
+        LOCAL_FILE,
+        UNKNOWN
+
+    }
+
+    public enum Priority {
+        HIGH(0),
+        NORMAL(1),
+        LOW(2);
 
-    InsertIntoTableCommand command;
+        Priority(int value) {
+            this.value = value;
+        }
 
-    StmtExecutor stmtExecutor;
+        private final int value;
 
-    ConnectContext ctx;
+        public int getValue() {
+            return value;
+        }
+    }
 
-    @SerializedName("tis")
-    ConcurrentLinkedQueue<Long> taskIdList;
+    public InsertJob(Long jobId, String jobName,
+                     JobStatus jobStatus,
+                     LabelName labelName,
+                     String comment,
+                     UserIdentity createUser,
+                     JobExecutionConfiguration jobConfig,
+                     Long createTimeMs,
+                     String executeSql) {
+        super(jobId, jobName, jobStatus, labelName.getDbName(), comment, createUser,
+                jobConfig, createTimeMs, executeSql, null);
+        this.dbId = ConnectContext.get().getCurrentDbId();
+        this.labelName = labelName.getLabelName();
+    }
 
-    // max save task num, do we need to config it?
-    private static final int MAX_SAVE_TASK_NUM = 100;
+    public InsertJob(ConnectContext ctx,
+                      StmtExecutor executor,
+                      String labelName,
+                      List<InsertIntoTableCommand> plans,
+                      Set<String> sinkTableNames,
+                      Map<String, String> properties,
+                      String comment,
+                      JobExecutionConfiguration jobConfig) {
+        super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null,
+                comment, ctx.getCurrentUserIdentity(), jobConfig);
+        this.ctx = ctx;
+        this.plans = plans;
+        this.stmtExecutor = executor;
+        this.dbId = ctx.getCurrentDbId();
+        this.labelName = labelName;
+        this.tableNames = sinkTableNames;
+        this.properties = properties;
+        // TODO: not support other type yet
+        this.loadType = InsertJob.LoadType.BULK;
+    }
 
     @Override
-    public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
-        //nothing need to do in insert job
-        InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
-        task.setJobId(getJobId());
-        task.setTaskType(taskType);
-        task.setTaskId(Env.getCurrentEnv().getNextId());
-        ArrayList<InsertTask> tasks = new ArrayList<>();
-        tasks.add(task);
-        super.initTasks(tasks);
-        addNewTask(task.getTaskId());
-        return tasks;
+    public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> taskContext) {
+        if (plans.isEmpty()) {
+            InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
+            task.setTaskType(taskType);
+            task.setJobId(getJobId());
+            task.setCreateTimeMs(System.currentTimeMillis());
+            task.setStatus(TaskStatus.PENDING);
+            ArrayList<InsertTask> tasks = new ArrayList<>();
+            tasks.add(task);
+            super.initTasks(tasks);
+            addNewTask(task.getTaskId());
+            return tasks;
+        } else {
+            return createBatchTasks(taskType);
+        }
     }
 
-    public void addNewTask(long id) {
+    private List<InsertTask> createBatchTasks(TaskType taskType) {

Review Comment:
   refactor in the future, because there are many conflicts for this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427703299


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr {
+    private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+    private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+    private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+    // lock for export job
+    // lock is private and must use after db lock
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    private JobManager<InsertJob, ?> getJobManager() {
+        return Env.getCurrentEnv().getJobManager();
+    }
+
+    /**
+     * add load job and add tasks
+     * @param loadJob job
+     */
+    public void addLoadJob(InsertJob loadJob) throws DdlException {
+        writeLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(loadJob.getDbId());
+            if (labelToLoadJobs != null && labelToLoadJobs.containsKey(loadJob.getLabel())) {
+                throw new LabelAlreadyUsedException(loadJob.getLabel());
+            }
+            unprotectAddJob(loadJob);
+        } catch (LabelAlreadyUsedException e) {
+            throw new RuntimeException(e);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void unprotectAddJob(InsertJob job) throws DdlException {
+        loadIdToJob.put(job.getJobId(), job);
+        try {
+            getJobManager().registerJob(job);
+            if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+                dbIdToLabelToLoadJobs.put(job.getDbId(), new ConcurrentHashMap<>());
+            }
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+            if (!labelToLoadJobs.containsKey(job.getLabel())) {
+                labelToLoadJobs.put(job.getLabel(), new ArrayList<>());
+            }
+            labelToLoadJobs.get(job.getLabel()).add(job);
+        } catch (org.apache.doris.job.exception.JobException e) {
+            throw new DdlException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * replay load job
+     * @param replayLoadLog load log
+     * @throws DdlException ex
+     */
+    public void replayLoadJob(ReplayLoadLog replayLoadLog) throws DdlException {
+        writeLock();
+        try {
+            if (replayLoadLog instanceof ReplayLoadLog.ReplayCreateLoadLog) {
+                InsertJob loadJob = new InsertJob((ReplayLoadLog.ReplayCreateLoadLog) replayLoadLog);
+                JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
+                jobConfig.setExecuteType(JobExecuteType.INSTANT);
+                loadJob.setJobConfig(jobConfig);
+                addLoadJob(loadJob);
+                LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getJobId()).add("msg", "replay create load job")
+                        .build());
+            } else if (replayLoadLog instanceof ReplayLoadLog.ReplayEndLoadLog) {
+                InsertJob job = loadIdToJob.get(replayLoadLog.getId());
+                if (job == null) {
+                    // This should not happen.
+                    // Last time I found that when user submit a job with already used label, an END_LOAD_JOB edit log
+                    // will be written but the job is not added to 'idToLoadJob', so this job here we got will be null.
+                    // And this bug has been fixed.
+                    // Just add a log here to observe.
+                    LOG.warn("job does not exist when replaying end load job edit log: {}", replayLoadLog);
+                    return;
+                }
+                job.unprotectReadEndOperation((ReplayLoadLog.ReplayEndLoadLog) replayLoadLog);
+                LOG.info(new LogBuilder(LogKey.LOAD_JOB, replayLoadLog.getId()).add("operation", replayLoadLog)
+                        .add("msg", "replay end load job").build());
+            } else {
+                throw new DdlException("Unsupported replay job type. ");
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    //    public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+    //                                  long scannedBytes, boolean isDone) {
+    //        LoadJobExecutor job = loadIdToJob.get(jobId);
+    //        if (job != null) {
+    //            job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
+    //        }
+    //    }
+
+    /**
+     * cancel job
+     *
+     * @param dbName   dbName
+     * @param label    job label
+     * @param state    job state
+     * @param operator filter operator, like or equals
+     */
+    public void cancelLoadJob(String dbName, String label, String state, CompoundPredicate.Operator operator)
+            throws JobException, AnalysisException, DdlException {
+        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+        // List of load jobs waiting to be cancelled
+        List<InsertJob> uncompletedLoadJob;
+        readLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+            if (labelToLoadJobs == null) {
+                throw new JobException("Load job does not exist");
+            }
+            List<InsertJob> matchLoadJobs = Lists.newArrayList();
+            addNeedCancelLoadJob(label, state, operator,
+                    labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+                    matchLoadJobs);
+            if (matchLoadJobs.isEmpty()) {
+                throw new JobException("Load job does not exist");
+            }
+            // check state here
+            uncompletedLoadJob =
+                    matchLoadJobs.stream().filter(InsertJob::isRunning)
+                            .collect(Collectors.toList());
+            if (uncompletedLoadJob.isEmpty()) {
+                throw new JobException("There is no uncompleted job");
+            }
+        } finally {
+            readUnlock();
+        }
+        for (InsertJob loadJob : uncompletedLoadJob) {
+            try {
+                loadJob.cancelJob();
+            } catch (JobException e) {
+                LOG.warn("Fail to cancel job, its label: {}", loadJob.getLabel());
+            }
+        }
+    }
+
+    private static void addNeedCancelLoadJob(String label, String state,

Review Comment:
   not required and I suggest make a method static if it's not use any instance  fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1862002157

   run clickbench


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1862002061

   run p0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "morningman (via GitHub)" <gi...@apache.org>.
morningman commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427583758


##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1899,6 +1899,14 @@ public class Config extends ConfigBase {
     @ConfField(masterOnly = true)
     public static boolean enable_hms_events_incremental_sync = false;
 
+    /**
+     * If set to true, doris will try to parse the ddl of a hive view and try to execute the query
+     * otherwise it will throw an AnalysisException.
+     */
+    @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)

Review Comment:
   add `description` field



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -54,35 +54,73 @@
 @Log4j2
 public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C>, Writable {
 
-    @SerializedName(value = "jid")
+    @SerializedName(value = "jobId")
     private Long jobId;
 
-    @SerializedName(value = "jn")
+    @SerializedName(value = "jobName")
     private String jobName;
 
-    @SerializedName(value = "js")
+    @SerializedName(value = "jobStatus")
     private JobStatus jobStatus;
 
-    @SerializedName(value = "cdb")
+    @SerializedName(value = "currentDbName")
     private String currentDbName;
 
-    @SerializedName(value = "c")
+    @SerializedName(value = "comment")
     private String comment;
 
-    @SerializedName(value = "cu")
+    @SerializedName(value = "createUser")
     private UserIdentity createUser;
 
-    @SerializedName(value = "jc")
+    @SerializedName(value = "jobConfig")
     private JobExecutionConfiguration jobConfig;
 
-    @SerializedName(value = "ctms")
-    private Long createTimeMs;
+    @SerializedName(value = "createTimeMs")
+    private Long createTimeMs = -1L;
+
+    @SerializedName(value = "startTimeMs")
+    private Long startTimeMs = -1L;
+
+    @SerializedName(value = "finishTimeMs")
+    private Long finishTimeMs = -1L;
 
     @SerializedName(value = "sql")
     String executeSql;
 
-    @SerializedName(value = "ftm")
-    private long finishTimeMs;
+    public AbstractJob() {}
+
+    public AbstractJob(Long id) {
+        setJobId(id);
+    }
+
+    public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,

Review Comment:
   Add comment to explain the different of this 2 constructors



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
             new Column("CreateTime", ScalarType.createStringType()),
             new Column("Comment", ScalarType.createStringType()));
 
+    private static final ShowResultSetMetaData TASK_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Label", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Status", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
+
+                    .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("User", ScalarType.createVarchar(20)))
+                    .build();
+
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
     static {
-        ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
+        ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
         for (int i = 0; i < SCHEMA.size(); i++) {
             builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
         }
         COLUMN_TO_INDEX = builder.build();
     }
 
-    @SerializedName(value = "lp")
-    String labelPrefix;
+    @SerializedName("taskIdList")
+    ConcurrentLinkedQueue<Long> taskIdList;
+    @SerializedName("dbId")
+    private final long dbId;
+    @SerializedName("labelName")
+    private String labelName;
+    @SerializedName("loadType")
+    private InsertJob.LoadType loadType;
+    // 0: the job status is pending
+    // n/100: n is the number of task which has been finished
+    // 99: all tasks have been finished
+    // 100: txn status is visible and load has been finished
+    @SerializedName("progress")
+    private int progress;
+    @SerializedName("failMsg")
+    private FailMsg failMsg;
+    @SerializedName("plans")
+    private List<InsertIntoTableCommand> plans;
+    private LoadStatistic loadStatistic = new LoadStatistic();
+    private Set<Long> finishedTaskIds = new HashSet<>();
+    private Set<String> tableNames;
+    private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+    private Map<String, String> properties;
+    private AuthorizationInfo authorizationInfo;
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private ConnectContext ctx;
+    private StmtExecutor stmtExecutor;
+    private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+    private List<TabletCommitInfo> commitInfos = new ArrayList<>();
+
+    // max save task num, do we need to config it?
+    private static final int MAX_SAVE_TASK_NUM = 100;
+
+    /**
+     * load job type
+     */
+    public enum LoadType {
+        BULK,
+        SPARK,
+        LOCAL_FILE,
+        UNKNOWN
+
+    }
+
+    public enum Priority {
+        HIGH(0),
+        NORMAL(1),
+        LOW(2);
 
-    InsertIntoTableCommand command;
+        Priority(int value) {
+            this.value = value;
+        }
 
-    StmtExecutor stmtExecutor;
+        private final int value;
 
-    ConnectContext ctx;
+        public int getValue() {
+            return value;
+        }
+    }
 
-    @SerializedName("tis")
-    ConcurrentLinkedQueue<Long> taskIdList;
+    public InsertJob(Long jobId, String jobName,
+                     JobStatus jobStatus,
+                     LabelName labelName,
+                     String comment,
+                     UserIdentity createUser,
+                     JobExecutionConfiguration jobConfig,
+                     Long createTimeMs,
+                     String executeSql) {
+        super(jobId, jobName, jobStatus, labelName.getDbName(), comment, createUser,
+                jobConfig, createTimeMs, executeSql, null);
+        this.dbId = ConnectContext.get().getCurrentDbId();
+        this.labelName = labelName.getLabelName();
+    }
 
-    // max save task num, do we need to config it?
-    private static final int MAX_SAVE_TASK_NUM = 100;
+    public InsertJob(ConnectContext ctx,
+                      StmtExecutor executor,
+                      String labelName,
+                      List<InsertIntoTableCommand> plans,
+                      Set<String> sinkTableNames,
+                      Map<String, String> properties,
+                      String comment,
+                      JobExecutionConfiguration jobConfig) {
+        super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null,

Review Comment:
   The generation of job id be both generated outside or inside.
   In first constructor, it is passed from outside, in second constructor, is is generated inside.
   We should unify them



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java:
##########
@@ -99,11 +99,34 @@ public interface Job<T extends AbstractTask, C> {
 
     /**
      * Cancels all running tasks of this job.
-     *
      * @throws JobException If cancelling a running task fails.
      */
     void cancelAllTasks() throws JobException;
 
+    /**
+     * register job
+     * @throws JobException If register job failed.
+     */
+    void onRegister() throws JobException;
+
+    /**
+     * register job failed
+     * @throws JobException If failed.
+     */
+    void onRegisterFailed() throws JobException;
+
+    /**
+     * relay create job
+     * @throws JobException  If replay create failed.
+     */
+    void onReplayCreate() throws JobException;
+
+    /**
+     * relay finished or cancelled job

Review Comment:
   ```suggestion
        * replay finished or cancelled job
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -236,4 +290,135 @@ public T getJob(Long jobId) {
         return jobMap.get(jobId);
     }
 
+
+    /**
+     * get load info by db
+     *
+     * @param dbId          db id
+     * @param dbName        db name
+     * @param labelValue    label name
+     * @param accurateMatch accurate match
+     * @param jobState      state
+     * @return load infos
+     * @throws AnalysisException ex
+     */
+    public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName,
+                                                      String labelValue,
+                                                      boolean accurateMatch,
+                                                      JobState jobState) throws AnalysisException {
+        LinkedList<List<Comparable>> loadJobInfos = new LinkedList<>();
+        if (!Env.getCurrentEnv().getLabelProcessor().existLabelJobs(dbId)) {
+            return loadJobInfos;
+        }
+        readLock();
+        try {
+            List<InsertJob> loadJobList = Env.getCurrentEnv().getLabelProcessor()
+                    .filterLabelJobs(dbId, labelValue, accurateMatch);
+            // check state
+            for (InsertJob loadJob : loadJobList) {
+                try {
+                    if (jobState != null && !validState(jobState, loadJob)) {
+                        continue;
+                    }
+                    // add load job info, convert String list to Comparable list
+                    loadJobInfos.add(new ArrayList<>(loadJob.getShowInfo()));
+                } catch (RuntimeException e) {
+                    // ignore this load job
+                    log.warn("get load job info failed. job id: {}", loadJob.getJobId(), e);
+                }
+            }
+            return loadJobInfos;
+        } finally {
+            readUnlock();
+        }
+    }
+
+    private static boolean validState(JobState jobState, InsertJob loadJob) {
+        JobStatus status = loadJob.getJobStatus();
+        switch (status) {
+            case RUNNING:
+                return jobState == JobState.PENDING || jobState == JobState.ETL
+                        || jobState == JobState.LOADING || jobState == JobState.COMMITTED;
+            case STOPPED:
+                return jobState == JobState.CANCELLED;
+            case FINISHED:
+                return jobState == JobState.FINISHED;
+            default:
+                return false;
+        }
+    }
+
+    public void cancelLoadJob(CancelLoadStmt cs)
+            throws JobException, AnalysisException, DdlException {
+        String dbName = cs.getDbName();
+        String label = cs.getLabel();
+        String state = cs.getState();
+        CompoundPredicate.Operator operator = cs.getOperator();
+        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+        // List of load jobs waiting to be cancelled
+        List<InsertJob> uncompletedLoadJob;
+        readLock();
+        try {
+            List<InsertJob> loadJobs = Env.getCurrentEnv().getLabelProcessor().getLabelJobs(db);
+            List<InsertJob> matchLoadJobs = Lists.newArrayList();
+            addNeedCancelLoadJob(label, state, operator, loadJobs, matchLoadJobs);
+            if (matchLoadJobs.isEmpty()) {
+                throw new JobException("Load job does not exist");
+            }
+            // check state here
+            uncompletedLoadJob =

Review Comment:
   ```suggestion
               unfinishedLoadJob =
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+    private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+    /**
+     * getLabelJob
+     * @param db db
+     * @return label jobs
+     * @throws JobException e
+     */
+    public List<InsertJob> getLabelJobs(Database db) throws JobException {

Review Comment:
   The method's name `getLabelJobs` and `addLabelJob` sounds strange.
   How about just `getJobs` and `addJob`?



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -303,7 +348,19 @@ public ShowResultSetMetaData getJobMetaData() {
         return builder.build();
     }
 
-    private static long getNextId() {

Review Comment:
   Why changing the method of generating task id?



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -54,35 +54,73 @@
 @Log4j2
 public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C>, Writable {
 
-    @SerializedName(value = "jid")
+    @SerializedName(value = "jobId")

Review Comment:
   Do not change it.
   We suggest to use abbr in future, to minimize the edit log size



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr {
+    private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+    private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+    private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+    // lock for export job
+    // lock is private and must use after db lock
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    private JobManager<InsertJob, ?> getJobManager() {
+        return Env.getCurrentEnv().getJobManager();
+    }
+
+    /**
+     * add load job and add tasks
+     * @param loadJob job
+     */
+    public void addLoadJob(InsertJob loadJob) throws DdlException {
+        writeLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(loadJob.getDbId());
+            if (labelToLoadJobs != null && labelToLoadJobs.containsKey(loadJob.getLabel())) {
+                throw new LabelAlreadyUsedException(loadJob.getLabel());
+            }
+            unprotectAddJob(loadJob);
+        } catch (LabelAlreadyUsedException e) {
+            throw new RuntimeException(e);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    private void unprotectAddJob(InsertJob job) throws DdlException {
+        loadIdToJob.put(job.getJobId(), job);
+        try {
+            getJobManager().registerJob(job);
+            if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+                dbIdToLabelToLoadJobs.put(job.getDbId(), new ConcurrentHashMap<>());
+            }
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+            if (!labelToLoadJobs.containsKey(job.getLabel())) {
+                labelToLoadJobs.put(job.getLabel(), new ArrayList<>());
+            }
+            labelToLoadJobs.get(job.getLabel()).add(job);
+        } catch (org.apache.doris.job.exception.JobException e) {
+            throw new DdlException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * replay load job
+     * @param replayLoadLog load log
+     * @throws DdlException ex
+     */
+    public void replayLoadJob(ReplayLoadLog replayLoadLog) throws DdlException {
+        writeLock();
+        try {
+            if (replayLoadLog instanceof ReplayLoadLog.ReplayCreateLoadLog) {
+                InsertJob loadJob = new InsertJob((ReplayLoadLog.ReplayCreateLoadLog) replayLoadLog);
+                JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
+                jobConfig.setExecuteType(JobExecuteType.INSTANT);
+                loadJob.setJobConfig(jobConfig);
+                addLoadJob(loadJob);
+                LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getJobId()).add("msg", "replay create load job")
+                        .build());
+            } else if (replayLoadLog instanceof ReplayLoadLog.ReplayEndLoadLog) {
+                InsertJob job = loadIdToJob.get(replayLoadLog.getId());
+                if (job == null) {
+                    // This should not happen.
+                    // Last time I found that when user submit a job with already used label, an END_LOAD_JOB edit log
+                    // will be written but the job is not added to 'idToLoadJob', so this job here we got will be null.
+                    // And this bug has been fixed.
+                    // Just add a log here to observe.
+                    LOG.warn("job does not exist when replaying end load job edit log: {}", replayLoadLog);
+                    return;
+                }
+                job.unprotectReadEndOperation((ReplayLoadLog.ReplayEndLoadLog) replayLoadLog);
+                LOG.info(new LogBuilder(LogKey.LOAD_JOB, replayLoadLog.getId()).add("operation", replayLoadLog)
+                        .add("msg", "replay end load job").build());
+            } else {
+                throw new DdlException("Unsupported replay job type. ");
+            }
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    //    public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+    //                                  long scannedBytes, boolean isDone) {
+    //        LoadJobExecutor job = loadIdToJob.get(jobId);
+    //        if (job != null) {
+    //            job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
+    //        }
+    //    }
+
+    /**
+     * cancel job
+     *
+     * @param dbName   dbName
+     * @param label    job label
+     * @param state    job state
+     * @param operator filter operator, like or equals
+     */
+    public void cancelLoadJob(String dbName, String label, String state, CompoundPredicate.Operator operator)
+            throws JobException, AnalysisException, DdlException {
+        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+        // List of load jobs waiting to be cancelled
+        List<InsertJob> uncompletedLoadJob;
+        readLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+            if (labelToLoadJobs == null) {
+                throw new JobException("Load job does not exist");
+            }
+            List<InsertJob> matchLoadJobs = Lists.newArrayList();
+            addNeedCancelLoadJob(label, state, operator,
+                    labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+                    matchLoadJobs);
+            if (matchLoadJobs.isEmpty()) {
+                throw new JobException("Load job does not exist");
+            }
+            // check state here
+            uncompletedLoadJob =
+                    matchLoadJobs.stream().filter(InsertJob::isRunning)
+                            .collect(Collectors.toList());
+            if (uncompletedLoadJob.isEmpty()) {
+                throw new JobException("There is no uncompleted job");
+            }
+        } finally {
+            readUnlock();
+        }
+        for (InsertJob loadJob : uncompletedLoadJob) {
+            try {
+                loadJob.cancelJob();
+            } catch (JobException e) {
+                LOG.warn("Fail to cancel job, its label: {}", loadJob.getLabel());
+            }
+        }
+    }
+
+    private static void addNeedCancelLoadJob(String label, String state,

Review Comment:
   ```suggestion
       private void addNeedCancelLoadJob(String label, String state,
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
             new Column("CreateTime", ScalarType.createStringType()),
             new Column("Comment", ScalarType.createStringType()));
 
+    private static final ShowResultSetMetaData TASK_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Label", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Status", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
+
+                    .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("User", ScalarType.createVarchar(20)))
+                    .build();
+
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
     static {
-        ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
+        ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
         for (int i = 0; i < SCHEMA.size(); i++) {
             builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
         }
         COLUMN_TO_INDEX = builder.build();
     }
 
-    @SerializedName(value = "lp")
-    String labelPrefix;
+    @SerializedName("taskIdList")
+    ConcurrentLinkedQueue<Long> taskIdList;
+    @SerializedName("dbId")
+    private final long dbId;
+    @SerializedName("labelName")
+    private String labelName;
+    @SerializedName("loadType")
+    private InsertJob.LoadType loadType;
+    // 0: the job status is pending
+    // n/100: n is the number of task which has been finished
+    // 99: all tasks have been finished
+    // 100: txn status is visible and load has been finished
+    @SerializedName("progress")

Review Comment:
   Use abbreviation as serialize name



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
             new Column("CreateTime", ScalarType.createStringType()),
             new Column("Comment", ScalarType.createStringType()));
 
+    private static final ShowResultSetMetaData TASK_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))

Review Comment:
   Use String as Column Type, for all columns. Varchar 20 is not long enough



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -125,23 +273,30 @@ public void cancelTaskById(long taskId) throws JobException {
     }
 
     @Override
-    public boolean isReadyForScheduling(Map taskContext) {
-        return CollectionUtils.isEmpty(getRunningTasks());
+    public void cancelAllTasks() throws JobException {
+        writeLock();

Review Comment:
   What is this lock protect?



##########
fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java:
##########
@@ -99,11 +99,34 @@ public interface Job<T extends AbstractTask, C> {
 
     /**
      * Cancels all running tasks of this job.
-     *
      * @throws JobException If cancelling a running task fails.
      */
     void cancelAllTasks() throws JobException;
 
+    /**
+     * register job
+     * @throws JobException If register job failed.
+     */
+    void onRegister() throws JobException;
+
+    /**
+     * register job failed
+     * @throws JobException If failed.
+     */
+    void onRegisterFailed() throws JobException;
+
+    /**
+     * relay create job

Review Comment:
   ```suggestion
        * replay create job
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -74,30 +72,70 @@ public class InsertTask extends AbstractTask {
     }
 
     private String labelName;
-
     private InsertIntoTableCommand command;
-
     private StmtExecutor stmtExecutor;
-
     private ConnectContext ctx;
-
     private String sql;
-
     private String currentDb;
-
     private UserIdentity userIdentity;
-
+    private LoadStatistic loadStatistic;
     private AtomicBoolean isCanceled = new AtomicBoolean(false);
-
     private AtomicBoolean isFinished = new AtomicBoolean(false);
-
     private static final String LABEL_SPLITTER = "_";
 
+    private FailMsg failMsg;
+    @Getter
+    private String trackingUrl;
 
     @Getter
     @Setter
-    private LoadJob loadJob;
+    private InsertJob jobInfo;
+    private TaskType taskType = TaskType.PENDING;
+    private MergeType mergeType = MergeType.APPEND;
+
+    /**
+     * task merge type
+     */
+    enum MergeType {
+        MERGE,
+        APPEND,
+        DELETE
+    }
+
+    /**
+     * task type
+     */
+    enum TaskType {
+        UNKNOWN, // this is only for ISSUE #2354
+        PENDING,
+        LOADING,
+        FINISHED,
+        FAILED,
+        CANCELLED
+    }
+
+    public InsertTask(InsertIntoTableCommand insertInto,
+                      ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
+        this(null, insertInto, ctx, executor, statistic);
+    }
+
+    public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) {
+        this.labelName = labelName;
+        this.sql = sql;
+        this.currentDb = currentDb;
+        this.userIdentity = userIdentity;
+        setTaskId(Env.getCurrentEnv().getNextId());
+    }
 
+    public InsertTask(String labelName, InsertIntoTableCommand insertInto,
+                       ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
+        this.labelName = labelName;

Review Comment:
   Label is not used in `InsertTask`.
   So I think we can merge this constructor and the first constructor.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -203,27 +351,273 @@ public void onTaskSuccess(InsertTask task) throws JobException {
 
     @Override
     public List<String> getShowInfo() {
-        return super.getCommonShowInfo();
+        readLock();
+        try {
+            // check auth
+            checkAuth("SHOW LOAD");
+            List<String> jobInfo = Lists.newArrayList();
+            // jobId
+            jobInfo.add(getJobId().toString());
+            // label
+            jobInfo.add(getLabelName());
+            // state
+            jobInfo.add(getJobStatus().name());
+
+            // progress
+            String progress = Env.getCurrentProgressManager().getProgressInfo(String.valueOf(getJobId()));
+            switch (getJobStatus()) {
+                case RUNNING:
+                    if (isPending()) {
+                        jobInfo.add("ETL:0%; LOAD:0%");
+                    } else {
+                        jobInfo.add("ETL:100%; LOAD:" + progress + "%");
+                    }
+                    break;
+                case FINISHED:
+                    jobInfo.add("ETL:100%; LOAD:100%");
+                    break;
+                case STOPPED:
+                default:
+                    jobInfo.add("ETL:N/A; LOAD:N/A");
+                    break;
+            }
+            // type
+            jobInfo.add(loadType.name());
+
+            // etl info
+            if (loadStatistic.getCounters().size() == 0) {
+                jobInfo.add(FeConstants.null_string);
+            } else {
+                jobInfo.add(Joiner.on("; ").withKeyValueSeparator("=").join(loadStatistic.getCounters()));
+            }
+
+            // task info
+            jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + getTimeout()
+                    + "; max_filter_ratio:" + getMaxFilterRatio() + "; priority:" + getPriority());
+            // error msg
+            if (failMsg == null) {
+                jobInfo.add(FeConstants.null_string);
+            } else {
+                jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg());
+            }
+
+            // create time
+            jobInfo.add(TimeUtils.longToTimeString(getCreateTimeMs()));
+            // etl start time
+            jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+            // etl end time
+            jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+            // load start time
+            jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+            // load end time
+            jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs()));
+            // tracking urls
+            List<String> trackingUrl = idToTasks.values().stream()
+                    .map(InsertTask::getTrackingUrl)
+                    .collect(Collectors.toList());
+            if (trackingUrl.isEmpty()) {
+                jobInfo.add(FeConstants.null_string);
+            } else {
+                jobInfo.add(trackingUrl.toString());
+            }
+            // job details
+            jobInfo.add(loadStatistic.toJson());
+            // transaction id
+            jobInfo.add(String.valueOf(0));
+            // error tablets
+            jobInfo.add(errorTabletsToJson());
+            // user, some load job may not have user info
+            if (getCreateUser() == null || getCreateUser().getQualifiedUser() == null) {
+                jobInfo.add(FeConstants.null_string);
+            } else {
+                jobInfo.add(getCreateUser().getQualifiedUser());
+            }
+            // comment
+            jobInfo.add(getComment());
+            return jobInfo;
+        } catch (DdlException e) {
+            throw new RuntimeException(e);
+        } finally {
+            readUnlock();
+        }
+    }
+
+    private String getPriority() {
+        return properties.getOrDefault(LoadStmt.PRIORITY, Priority.NORMAL.name());
+    }
+
+    public double getMaxFilterRatio() {
+        return Double.parseDouble(properties.getOrDefault(LoadStmt.MAX_FILTER_RATIO_PROPERTY, "0.0"));
+    }
+
+    public long getTimeout() {
+        if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
+            return Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY));
+        }
+        return Config.broker_load_default_timeout_second;
+    }
+
+
+    public static InsertJob readFields(DataInput in) throws IOException {
+        String jsonJob = Text.readString(in);
+        InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class);
+        job.setRunningTasks(new ArrayList<>());
+        return job;
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, GsonUtils.GSON.toJson(this));
     }
 
-    private static final ShowResultSetMetaData TASK_META_DATA =
-            ShowResultSetMetaData.builder()
-                    .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("Label", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("Status", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
+    public String errorTabletsToJson() {
+        Map<Long, String> map = new HashMap<>();
+        errorTabletInfos.stream().limit(Config.max_error_tablet_of_broker_load)
+                .forEach(p -> map.put(p.getTabletId(), p.getMsg()));
+        Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+        return gson.toJson(map);
+    }
 
-                    .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
-                    .addColumn(new Column("User", ScalarType.createVarchar(20)))
-                    .build();
+    public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+                                    long scannedBytes, boolean isDone) {
+        loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
+        progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 100);
+        if (progress == 100) {
+            progress = 99;
+        }
+    }
+
+    private void checkAuth(String command) throws DdlException {
+        if (authorizationInfo == null) {
+            // use the old method to check priv
+            checkAuthWithoutAuthInfo(command);
+            return;
+        }
+        if (!Env.getCurrentEnv().getAccessManager().checkPrivByAuthInfo(ConnectContext.get(), authorizationInfo,
+                PrivPredicate.LOAD)) {
+            ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
+                    Privilege.LOAD_PRIV);
+        }
+    }
+
+    /**
+     * This method is compatible with old load job without authorization info
+     * If db or table name could not be found by id, it will throw the NOT_EXISTS_ERROR
+     *
+     * @throws DdlException
+     */
+    private void checkAuthWithoutAuthInfo(String command) throws DdlException {
+        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+        // check auth
+        if (tableNames.isEmpty()) {
+            // forward compatibility
+            if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), db.getFullName(),
+                    PrivPredicate.LOAD)) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
+                        Privilege.LOAD_PRIV);
+            }
+        } else {
+            for (String tblName : tableNames) {
+                if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), db.getFullName(),
+                        tblName, PrivPredicate.LOAD)) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
+                            command,
+                            ConnectContext.get().getQualifiedUser(),
+                            ConnectContext.get().getRemoteIP(), db.getFullName() + ": " + tblName);
+                }
+            }
+        }
+    }
+
+    public void unprotectReadEndOperation(InsertJob replayLog) {
+        setJobStatus(replayLog.getJobStatus());
+        progress = replayLog.getProgress();
+        setStartTimeMs(replayLog.getStartTimeMs());
+        setFinishTimeMs(replayLog.getFinishTimeMs());
+        failMsg = replayLog.getFailMsg();
+    }
+
+    public String getResourceName() {
+        // TODO: get tvf param from tvf relation
+        return LoadType.BULK.name();
+    }
+
+    public boolean isRunning() {
+        return getJobStatus() != JobStatus.FINISHED;
+    }
+
+    public boolean isPending() {
+        return getJobStatus() != JobStatus.FINISHED;
+    }
+
+    public boolean isCommitted() {

Review Comment:
   remove unused method `isCommitted()` and `isFinished()`



##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -26,57 +34,105 @@
 import org.apache.doris.job.common.JobType;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
 import org.apache.doris.job.scheduler.JobScheduler;
 import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.load.loadv2.JobState;
 
+import com.google.common.collect.Lists;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 @Log4j2
 public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
 
-
     private final ConcurrentHashMap<Long, T> jobMap = new ConcurrentHashMap<>(32);
 
-    private JobScheduler jobScheduler;
+    private JobScheduler<T, C> jobScheduler;
+
+    // lock for job
+    // lock is private and must use after db lock
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
 
     public void start() {
-        jobScheduler = new JobScheduler(jobMap);
+        jobScheduler = new JobScheduler<T, C>(jobMap);
         jobScheduler.start();
     }
 
+
+    /**
+     * get running job
+     *
+     * @param jobId id
+     * @return running job
+     */
+    public T getJob(long jobId) {
+        return jobMap.get(jobId);
+    }
+
     public void registerJob(T job) throws JobException {
-        job.checkJobParams();
-        checkJobNameExist(job.getJobName());
-        if (jobMap.get(job.getJobId()) != null) {
-            throw new JobException("job id exist, jobId:" + job.getJobId());
+        writeLock();
+        try {
+            job.onRegister();
+            job.checkJobParams();
+            checkJobNameExist(job.getJobName());
+            if (jobMap.get(job.getJobId()) != null) {
+                throw new JobException("job id exist, jobId:" + job.getJobId());
+            }
+            jobMap.put(job.getJobId(), job);
+            //check its need to scheduler
+            jobScheduler.scheduleOneJob(job);
+            job.logCreateOperation();
+        } catch (JobException e) {
+            // job.onRegisterFailed();

Review Comment:
   Why comment out this?



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
             new Column("CreateTime", ScalarType.createStringType()),
             new Column("Comment", ScalarType.createStringType()));
 
+    private static final ShowResultSetMetaData TASK_META_DATA =
+            ShowResultSetMetaData.builder()
+                    .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Label", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("Status", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
+
+                    .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
+                    .addColumn(new Column("User", ScalarType.createVarchar(20)))
+                    .build();
+
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
     static {
-        ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
+        ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
         for (int i = 0; i < SCHEMA.size(); i++) {
             builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
         }
         COLUMN_TO_INDEX = builder.build();
     }
 
-    @SerializedName(value = "lp")
-    String labelPrefix;
+    @SerializedName("taskIdList")
+    ConcurrentLinkedQueue<Long> taskIdList;
+    @SerializedName("dbId")
+    private final long dbId;
+    @SerializedName("labelName")
+    private String labelName;
+    @SerializedName("loadType")
+    private InsertJob.LoadType loadType;
+    // 0: the job status is pending
+    // n/100: n is the number of task which has been finished
+    // 99: all tasks have been finished
+    // 100: txn status is visible and load has been finished
+    @SerializedName("progress")
+    private int progress;
+    @SerializedName("failMsg")
+    private FailMsg failMsg;
+    @SerializedName("plans")
+    private List<InsertIntoTableCommand> plans;
+    private LoadStatistic loadStatistic = new LoadStatistic();
+    private Set<Long> finishedTaskIds = new HashSet<>();
+    private Set<String> tableNames;
+    private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+    private Map<String, String> properties;
+    private AuthorizationInfo authorizationInfo;
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private ConnectContext ctx;
+    private StmtExecutor stmtExecutor;
+    private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+    private List<TabletCommitInfo> commitInfos = new ArrayList<>();
+
+    // max save task num, do we need to config it?
+    private static final int MAX_SAVE_TASK_NUM = 100;
+
+    /**
+     * load job type
+     */
+    public enum LoadType {
+        BULK,
+        SPARK,
+        LOCAL_FILE,
+        UNKNOWN
+
+    }
+
+    public enum Priority {
+        HIGH(0),
+        NORMAL(1),
+        LOW(2);
 
-    InsertIntoTableCommand command;
+        Priority(int value) {
+            this.value = value;
+        }
 
-    StmtExecutor stmtExecutor;
+        private final int value;
 
-    ConnectContext ctx;
+        public int getValue() {
+            return value;
+        }
+    }
 
-    @SerializedName("tis")
-    ConcurrentLinkedQueue<Long> taskIdList;
+    public InsertJob(Long jobId, String jobName,
+                     JobStatus jobStatus,
+                     LabelName labelName,
+                     String comment,
+                     UserIdentity createUser,
+                     JobExecutionConfiguration jobConfig,
+                     Long createTimeMs,
+                     String executeSql) {
+        super(jobId, jobName, jobStatus, labelName.getDbName(), comment, createUser,
+                jobConfig, createTimeMs, executeSql, null);
+        this.dbId = ConnectContext.get().getCurrentDbId();
+        this.labelName = labelName.getLabelName();
+    }
 
-    // max save task num, do we need to config it?
-    private static final int MAX_SAVE_TASK_NUM = 100;
+    public InsertJob(ConnectContext ctx,
+                      StmtExecutor executor,
+                      String labelName,
+                      List<InsertIntoTableCommand> plans,
+                      Set<String> sinkTableNames,
+                      Map<String, String> properties,
+                      String comment,
+                      JobExecutionConfiguration jobConfig) {
+        super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null,
+                comment, ctx.getCurrentUserIdentity(), jobConfig);
+        this.ctx = ctx;
+        this.plans = plans;
+        this.stmtExecutor = executor;
+        this.dbId = ctx.getCurrentDbId();
+        this.labelName = labelName;
+        this.tableNames = sinkTableNames;
+        this.properties = properties;
+        // TODO: not support other type yet
+        this.loadType = InsertJob.LoadType.BULK;
+    }
 
     @Override
-    public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
-        //nothing need to do in insert job
-        InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
-        task.setJobId(getJobId());
-        task.setTaskType(taskType);
-        task.setTaskId(Env.getCurrentEnv().getNextId());
-        ArrayList<InsertTask> tasks = new ArrayList<>();
-        tasks.add(task);
-        super.initTasks(tasks);
-        addNewTask(task.getTaskId());
-        return tasks;
+    public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> taskContext) {
+        if (plans.isEmpty()) {
+            InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
+            task.setTaskType(taskType);
+            task.setJobId(getJobId());
+            task.setCreateTimeMs(System.currentTimeMillis());
+            task.setStatus(TaskStatus.PENDING);
+            ArrayList<InsertTask> tasks = new ArrayList<>();
+            tasks.add(task);
+            super.initTasks(tasks);
+            addNewTask(task.getTaskId());
+            return tasks;
+        } else {
+            return createBatchTasks(taskType);
+        }
     }
 
-    public void addNewTask(long id) {
+    private List<InsertTask> createBatchTasks(TaskType taskType) {

Review Comment:
   We should unify the `createBatchTasks()` and `createTasks()`.
   For example, why there is `idToTasks.put(task.getTaskId(), task);` in `createBatchTasks()` but not in `createTasks()`?



##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -95,7 +151,7 @@ public void unregisterJob(String jobName) throws JobException {
     public void alterJobStatus(Long jobId, JobStatus status) throws JobException {

Review Comment:
   ```suggestion
       private void alterJobStatus(Long jobId, JobStatus status) throws JobException {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+    private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();

Review Comment:
   This map should be protect by a lock.
   Not for concurrency issue, but for atomic operation



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -74,30 +72,70 @@ public class InsertTask extends AbstractTask {
     }
 
     private String labelName;

Review Comment:
   This field is unused



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -130,22 +168,14 @@ protected TUniqueId generateQueryId(String taskIdString) {
         return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
     }
 
-    public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) {
-        this.labelName = labelName;
-        this.sql = sql;
-        this.currentDb = currentDb;
-        this.userIdentity = userIdentity;
-
-    }
-
     @Override
     public void run() throws JobException {
         try {
             if (isCanceled.get()) {
                 log.info("task has been canceled, task id is {}", getTaskId());
                 return;
             }
-            command.run(ctx, stmtExecutor);
+            command.runWithStatistic(ctx, stmtExecutor, loadStatistic);

Review Comment:
   `Statistic` always mean the statistic used for CBO.
   Better change the name.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+    private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+    /**
+     * getLabelJob
+     * @param db db
+     * @return label jobs
+     * @throws JobException e
+     */
+    public List<InsertJob> getLabelJobs(Database db) throws JobException {

Review Comment:
   Same issue for `filterLabelJobs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427652917


##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -303,7 +348,19 @@ public ShowResultSetMetaData getJobMetaData() {
         return builder.build();
     }
 
-    private static long getNextId() {

Review Comment:
   we will generate task id in constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868842444

   (From new machine)TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 44.88 seconds
    stream load tsv:          560 seconds loaded 74807831229 Bytes, about 127 MB/s
    stream load json:         19 seconds loaded 2358488459 Bytes, about 118 MB/s
    stream load orc:          66 seconds loaded 1101869774 Bytes, about 15 MB/s
    stream load parquet:          32 seconds loaded 861443392 Bytes, about 25 MB/s
    insert into select:          28.6 seconds inserted 10000000 Rows, about 349K ops/s
    storage size: 17183712737 Bytes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868824661

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1851530308

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859543017

   
   TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   ```
   Tpch sf100 test result on commit 77e5930ee90ed59db5101451e5dfac82810aa1c3, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	4756	4485	4572	4485
   q2	362	159	159	159
   q3	1455	1276	1198	1198
   q4	1112	922	919	919
   q5	3181	3168	3202	3168
   q6	241	128	126	126
   q7	1009	495	487	487
   q8	2225	2217	2224	2217
   q9	6744	6686	6699	6686
   q10	3206	3260	3273	3260
   q11	325	197	201	197
   q12	354	210	211	210
   q13	4581	3798	3783	3783
   q14	242	210	218	210
   q15	569	527	519	519
   q16	444	390	385	385
   q17	1025	658	594	594
   q18	7192	6958	6996	6958
   q19	1550	1491	1360	1360
   q20	577	318	286	286
   q21	3083	2668	2687	2668
   q22	353	280	289	280
   Total cold run time: 44586 ms
   Total hot run time: 40155 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4470	4417	4445	4417
   q2	266	162	165	162
   q3	3545	3536	3528	3528
   q4	2410	2395	2394	2394
   q5	5744	5730	5751	5730
   q6	241	120	121	120
   q7	2397	1859	1870	1859
   q8	3534	3545	3542	3542
   q9	9000	9010	8967	8967
   q10	3907	3953	4009	3953
   q11	502	389	395	389
   q12	764	578	584	578
   q13	4307	3596	3555	3555
   q14	294	257	255	255
   q15	564	522	529	522
   q16	520	470	480	470
   q17	1887	1836	1850	1836
   q18	8840	8342	8394	8342
   q19	1741	1773	1768	1768
   q20	2256	1940	1935	1935
   q21	6526	6170	6171	6170
   q22	499	427	435	427
   Total cold run time: 64214 ms
   Total hot run time: 60919 ms
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1862002449

   run pipelinex_p0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434742971


##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
         return createTasks(taskType, taskContext);
     }
 
-    public void initTasks(List<? extends AbstractTask> tasks) {
+    public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+        if (CollectionUtils.isEmpty(getRunningTasks())) {
+            runningTasks = new ArrayList<>();
+        }
         tasks.forEach(task -> {
-            task.setJobId(jobId);
-            task.setTaskId(getNextId());
+            task.setTaskType(taskType);
+            task.setJobId(getJobId());

Review Comment:
   set task id in task constructor, I think all these should put into its own constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859422672

   
   TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   ```
   Tpch sf100 test result on commit db2cd09edf366d66eb55caae3f4fd6d5f0b49ea2, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	4729	4493	4491	4491
   q2	362	151	158	151
   q3	1458	1266	1218	1218
   q4	1122	918	927	918
   q5	3141	3136	3162	3136
   q6	243	127	126	126
   q7	983	481	481	481
   q8	2240	2224	2187	2187
   q9	6712	6685	6657	6657
   q10	3226	3271	3279	3271
   q11	312	203	203	203
   q12	348	209	203	203
   q13	4573	3786	3789	3786
   q14	245	212	210	210
   q15	564	521	524	521
   q16	442	383	384	383
   q17	1019	620	525	525
   q18	7359	6845	6947	6845
   q19	1531	1436	1412	1412
   q20	557	317	296	296
   q21	3048	2678	2684	2678
   q22	345	280	279	279
   Total cold run time: 44559 ms
   Total hot run time: 39977 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4397	4398	4414	4398
   q2	264	163	172	163
   q3	3534	3527	3519	3519
   q4	2383	2360	2374	2360
   q5	5711	5735	5742	5735
   q6	245	122	122	122
   q7	2376	1889	1892	1889
   q8	3520	3524	3525	3524
   q9	9005	8992	9001	8992
   q10	3927	4014	4012	4012
   q11	522	392	374	374
   q12	770	606	619	606
   q13	4305	3567	3554	3554
   q14	289	263	257	257
   q15	573	524	522	522
   q16	510	463	479	463
   q17	1861	1873	1834	1834
   q18	8652	8336	8332	8332
   q19	1748	1760	1738	1738
   q20	2257	1934	1932	1932
   q21	6511	6157	6131	6131
   q22	517	417	424	417
   Total cold run time: 63877 ms
   Total hot run time: 60874 ms
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859705328

   
   TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   ```
   Tpch sf100 test result on commit f962a67ed124134f9a3abef0f825939517ee7a04, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	4667	4442	4443	4442
   q2	358	156	160	156
   q3	1459	1236	1236	1236
   q4	1098	870	882	870
   q5	3086	3171	3179	3171
   q6	245	128	127	127
   q7	986	493	481	481
   q8	2195	2200	2164	2164
   q9	6706	6627	6672	6627
   q10	3206	3278	3235	3235
   q11	317	207	201	201
   q12	344	211	210	210
   q13	4591	3793	3835	3793
   q14	238	216	217	216
   q15	572	533	526	526
   q16	442	384	386	384
   q17	1015	664	553	553
   q18	7132	6950	6971	6950
   q19	1523	1441	1378	1378
   q20	508	306	298	298
   q21	3091	2615	2637	2615
   q22	344	277	281	277
   Total cold run time: 44123 ms
   Total hot run time: 39910 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4415	4358	4368	4358
   q2	266	161	171	161
   q3	3531	3520	3511	3511
   q4	2378	2364	2366	2364
   q5	5722	5723	5708	5708
   q6	240	121	120	120
   q7	2381	1865	1853	1853
   q8	3503	3504	3510	3504
   q9	8935	8897	8976	8897
   q10	3863	4003	3972	3972
   q11	506	373	395	373
   q12	768	597	602	597
   q13	4279	3573	3575	3573
   q14	280	259	258	258
   q15	565	519	526	519
   q16	500	475	467	467
   q17	1861	1839	1839	1839
   q18	8727	8337	8259	8259
   q19	1716	1750	1755	1750
   q20	2242	1949	1928	1928
   q21	6505	6190	6184	6184
   q22	506	419	420	419
   Total cold run time: 63689 ms
   Total hot run time: 60614 ms
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1860237208

   
   TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   ```
   Tpch sf100 test result on commit 6004f9eafd4d55c89a19d1fa519deb1d5cd9bb5d, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	4715	4459	4497	4459
   q2	374	142	158	142
   q3	1458	1268	1240	1240
   q4	1113	909	931	909
   q5	3123	3146	3174	3146
   q6	248	124	129	124
   q7	996	478	487	478
   q8	2201	2228	2186	2186
   q9	6720	6659	6687	6659
   q10	3205	3291	3258	3258
   q11	317	207	204	204
   q12	357	209	204	204
   q13	4549	3805	3817	3805
   q14	242	213	213	213
   q15	565	527	548	527
   q16	438	388	384	384
   q17	1019	691	581	581
   q18	7157	6904	6945	6904
   q19	1534	1419	1385	1385
   q20	517	282	292	282
   q21	3008	2633	2674	2633
   q22	350	272	278	272
   Total cold run time: 44206 ms
   Total hot run time: 39995 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4406	4390	4392	4390
   q2	266	162	172	162
   q3	3538	3524	3536	3524
   q4	2384	2378	2377	2377
   q5	5734	5734	5701	5701
   q6	240	123	120	120
   q7	2385	1864	1900	1864
   q8	3521	3538	3520	3520
   q9	9040	9036	8981	8981
   q10	3912	3976	3999	3976
   q11	512	372	388	372
   q12	755	586	603	586
   q13	4280	3567	3582	3567
   q14	291	257	260	257
   q15	562	512	526	512
   q16	497	461	446	446
   q17	1870	1851	1876	1851
   q18	8623	8277	8160	8160
   q19	1742	1764	1746	1746
   q20	2272	1942	1952	1942
   q21	6524	6165	6183	6165
   q22	498	417	415	415
   Total cold run time: 63852 ms
   Total hot run time: 60634 ms
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "shuke987 (via GitHub)" <gi...@apache.org>.
shuke987 commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859500959

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859394343

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434742971


##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
         return createTasks(taskType, taskContext);
     }
 
-    public void initTasks(List<? extends AbstractTask> tasks) {
+    public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+        if (CollectionUtils.isEmpty(getRunningTasks())) {
+            runningTasks = new ArrayList<>();
+        }
         tasks.forEach(task -> {
-            task.setJobId(jobId);
-            task.setTaskId(getNextId());
+            task.setTaskType(taskType);
+            task.setJobId(getJobId());

Review Comment:
   set task id in task constructor, I think all these should put into its own constructor, not set elsewhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "zddr (via GitHub)" <gi...@apache.org>.
zddr commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434740764


##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
         return createTasks(taskType, taskContext);
     }
 
-    public void initTasks(List<? extends AbstractTask> tasks) {
+    public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+        if (CollectionUtils.isEmpty(getRunningTasks())) {
+            runningTasks = new ArrayList<>();
+        }
         tasks.forEach(task -> {
-            task.setJobId(jobId);
-            task.setTaskId(getNextId());
+            task.setTaskType(taskType);
+            task.setJobId(getJobId());

Review Comment:
   why remove t`ask.setTaskId(getNextId());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "zddr (via GitHub)" <gi...@apache.org>.
zddr commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434740764


##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
         return createTasks(taskType, taskContext);
     }
 
-    public void initTasks(List<? extends AbstractTask> tasks) {
+    public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+        if (CollectionUtils.isEmpty(getRunningTasks())) {
+            runningTasks = new ArrayList<>();
+        }
         tasks.forEach(task -> {
-            task.setJobId(jobId);
-            task.setTaskId(getNextId());
+            task.setTaskType(taskType);
+            task.setJobId(getJobId());

Review Comment:
   why remove `task.setTaskId(getNextId());`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434217621


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+    private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    /**
+     * get jobs with label
+     * @param db db
+     * @return jobs
+     * @throws JobException e
+     */
+    public List<InsertJob> getJobs(Database db) throws JobException {
+        readLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+            if (labelToLoadJobs == null) {
+                throw new JobException("Load job does not exist");
+            }
+            return labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+        } finally {
+            readUnlock();
+        }
+    }
+
+    /**
+     * add job with label
+     *
+     * @param job job with label
+     * @throws LabelAlreadyUsedException e
+     */
+    public void addJob(InsertJob job) throws LabelAlreadyUsedException {
+        writeLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs;
+            if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+                labelToLoadJobs = new ConcurrentHashMap<>();
+                dbIdToLabelToLoadJobs.put(job.getDbId(), labelToLoadJobs);
+            }
+            labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+            if (labelToLoadJobs.containsKey(job.getLabelName())) {
+                throw new LabelAlreadyUsedException(job.getLabelName());
+            } else {
+                labelToLoadJobs.put(job.getLabelName(), new ArrayList<>());
+            }
+            labelToLoadJobs.get(job.getLabelName()).add(job);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    /**
+     * support remove label job
+     * @param dbId db id
+     * @param labelName label name
+     */
+    public void removeJob(long dbId, String labelName) {

Review Comment:
   TODO: for clean old label method



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+    private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    /**
+     * get jobs with label
+     * @param db db
+     * @return jobs
+     * @throws JobException e
+     */
+    public List<InsertJob> getJobs(Database db) throws JobException {
+        readLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+            if (labelToLoadJobs == null) {
+                throw new JobException("Load job does not exist");
+            }
+            return labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+        } finally {
+            readUnlock();
+        }
+    }
+
+    /**
+     * add job with label
+     *
+     * @param job job with label
+     * @throws LabelAlreadyUsedException e
+     */
+    public void addJob(InsertJob job) throws LabelAlreadyUsedException {
+        writeLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs;
+            if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+                labelToLoadJobs = new ConcurrentHashMap<>();
+                dbIdToLabelToLoadJobs.put(job.getDbId(), labelToLoadJobs);
+            }
+            labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+            if (labelToLoadJobs.containsKey(job.getLabelName())) {
+                throw new LabelAlreadyUsedException(job.getLabelName());
+            } else {
+                labelToLoadJobs.put(job.getLabelName(), new ArrayList<>());
+            }
+            labelToLoadJobs.get(job.getLabelName()).add(job);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    /**
+     * support remove label job
+     * @param dbId db id
+     * @param labelName label name
+     */
+    public void removeJob(long dbId, String labelName) {

Review Comment:
   TODO: for clean old label method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "morningman (via GitHub)" <gi...@apache.org>.
morningman commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434185522


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+    private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+    private void readLock() {
+        lock.readLock().lock();
+    }
+
+    private void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    /**
+     * get jobs with label
+     * @param db db
+     * @return jobs
+     * @throws JobException e
+     */
+    public List<InsertJob> getJobs(Database db) throws JobException {
+        readLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+            if (labelToLoadJobs == null) {
+                throw new JobException("Load job does not exist");
+            }
+            return labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+        } finally {
+            readUnlock();
+        }
+    }
+
+    /**
+     * add job with label
+     *
+     * @param job job with label
+     * @throws LabelAlreadyUsedException e
+     */
+    public void addJob(InsertJob job) throws LabelAlreadyUsedException {
+        writeLock();
+        try {
+            Map<String, List<InsertJob>> labelToLoadJobs;
+            if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+                labelToLoadJobs = new ConcurrentHashMap<>();
+                dbIdToLabelToLoadJobs.put(job.getDbId(), labelToLoadJobs);
+            }
+            labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+            if (labelToLoadJobs.containsKey(job.getLabelName())) {
+                throw new LabelAlreadyUsedException(job.getLabelName());
+            } else {
+                labelToLoadJobs.put(job.getLabelName(), new ArrayList<>());
+            }
+            labelToLoadJobs.get(job.getLabelName()).add(job);
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    /**
+     * support remove label job
+     * @param dbId db id
+     * @param labelName label name
+     */
+    public void removeJob(long dbId, String labelName) {

Review Comment:
   Not used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868799775

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1855435006

   (From new machine)TeamCity pipeline, clickbench performance test result:
    the sum of best hot time: 44.31 seconds
    stream load tsv:          588 seconds loaded 74807831229 Bytes, about 121 MB/s
    stream load json:         19 seconds loaded 2358488459 Bytes, about 118 MB/s
    stream load orc:          67 seconds loaded 1101869774 Bytes, about 15 MB/s
    stream load parquet:          33 seconds loaded 861443392 Bytes, about 24 MB/s
    insert into select:          28.9 seconds inserted 10000000 Rows, about 346K ops/s
    storage size: 17219723249 Bytes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1424866033


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java:
##########
@@ -101,6 +105,14 @@ public void setJobId(long jobId) {
 
     @Override
     public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
+        runInternal(ctx, executor);
+    }
+
+    public void statefulRun(ConnectContext ctx, StmtExecutor executor) throws Exception {

Review Comment:
   > Looks like `statefulRun()` is same as `run()`?
       change to
       void runWithStatistic(ConnectContext ctx, StmtExecutor executor, LoadStatistic loadStatistic);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1851170290

   run buildall


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434742971


##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
         return createTasks(taskType, taskContext);
     }
 
-    public void initTasks(List<? extends AbstractTask> tasks) {
+    public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+        if (CollectionUtils.isEmpty(getRunningTasks())) {
+            runningTasks = new ArrayList<>();
+        }
         tasks.forEach(task -> {
-            task.setJobId(jobId);
-            task.setTaskId(getNextId());
+            task.setTaskType(taskType);
+            task.setJobId(getJobId());

Review Comment:
   set task id in task constructor, I think all these should put into its own constructor, not put elsewhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434742971


##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
         return createTasks(taskType, taskContext);
     }
 
-    public void initTasks(List<? extends AbstractTask> tasks) {
+    public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+        if (CollectionUtils.isEmpty(getRunningTasks())) {
+            runningTasks = new ArrayList<>();
+        }
         tasks.forEach(task -> {
-            task.setJobId(jobId);
-            task.setTaskId(getNextId());
+            task.setTaskType(taskType);
+            task.setJobId(getJobId());

Review Comment:
   set task id in task constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1867246300

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868926386

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868834531

   
   TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   ```
   Tpch sf100 test result on commit be4617365b39cf5059ec0b65912908484d93fb66, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	4713	4409	4409	4409
   q2	369	142	162	142
   q3	1466	1278	1277	1277
   q4	1101	852	867	852
   q5	3169	3159	3182	3159
   q6	255	129	131	129
   q7	977	476	492	476
   q8	2169	2223	2186	2186
   q9	6707	6663	6662	6662
   q10	3224	3268	3287	3268
   q11	310	190	183	183
   q12	359	210	207	207
   q13	4554	3780	3790	3780
   q14	234	209	211	209
   q15	571	522	536	522
   q16	446	388	392	388
   q17	1022	663	575	575
   q18	7068	6748	6872	6748
   q19	1548	1414	1376	1376
   q20	525	316	321	316
   q21	3098	2675	2647	2647
   q22	351	281	288	281
   Total cold run time: 44236 ms
   Total hot run time: 39792 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4368	4326	4313	4313
   q2	273	175	176	175
   q3	3519	3510	3525	3510
   q4	2403	2382	2382	2382
   q5	5714	5739	5739	5739
   q6	246	123	126	123
   q7	2358	1848	1858	1848
   q8	3533	3553	3542	3542
   q9	9047	9003	8979	8979
   q10	3907	4007	3991	3991
   q11	495	377	368	368
   q12	771	582	617	582
   q13	4301	3554	3554	3554
   q14	285	248	247	247
   q15	578	528	525	525
   q16	485	442	462	442
   q17	1892	1876	1867	1867
   q18	8525	8144	8081	8081
   q19	1761	1743	1740	1740
   q20	2249	1962	1933	1933
   q21	6508	6139	6173	6139
   q22	518	445	397	397
   Total cold run time: 63736 ms
   Total hot run time: 60477 ms
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]

Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1862091444

   
   TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
   ```
   Tpch sf100 test result on commit 6004f9eafd4d55c89a19d1fa519deb1d5cd9bb5d, data reload: false
   
   run tpch-sf100 query with default conf and session variables
   q1	4646	4451	4465	4451
   q2	359	152	160	152
   q3	1475	1250	1202	1202
   q4	1113	876	886	876
   q5	3142	3130	3164	3130
   q6	246	129	130	129
   q7	996	499	481	481
   q8	2166	2231	2177	2177
   q9	6682	6630	6665	6630
   q10	3218	3273	3286	3273
   q11	309	177	189	177
   q12	358	210	204	204
   q13	4549	3785	3791	3785
   q14	241	210	218	210
   q15	573	524	520	520
   q16	437	380	384	380
   q17	1024	626	606	606
   q18	7267	6947	6960	6947
   q19	1577	1412	1372	1372
   q20	501	304	286	286
   q21	3042	2639	2695	2639
   q22	360	280	286	280
   Total cold run time: 44281 ms
   Total hot run time: 39907 ms
   
   run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
   q1	4343	4362	4354	4354
   q2	266	163	174	163
   q3	3546	3529	3527	3527
   q4	2386	2376	2360	2360
   q5	5728	5771	5756	5756
   q6	240	121	123	121
   q7	2379	1903	1870	1870
   q8	3523	3533	3536	3533
   q9	9012	9017	8988	8988
   q10	3915	4004	4025	4004
   q11	492	380	369	369
   q12	764	593	597	593
   q13	4302	3578	3556	3556
   q14	281	258	264	258
   q15	567	519	527	519
   q16	504	463	500	463
   q17	1859	1866	1854	1854
   q18	8740	8202	8387	8202
   q19	1767	1807	1817	1807
   q20	2259	1947	1942	1942
   q21	6522	6206	6181	6181
   q22	506	427	442	427
   Total cold run time: 63901 ms
   Total hot run time: 60847 ms
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org