You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "wsjz (via GitHub)" <gi...@apache.org> on 2023/11/03 02:58:23 UTC
[PR] [feature](Load)(step2)support nereids load job schedule [doris]
wsjz opened a new pull request, #26356:
URL: https://github.com/apache/doris/pull/26356
## Proposed changes
We will Integrate new load job manager into new job scheduling framework so that the insert into task can be scheduled after the broker load sql is converted to insert into TVF(table value function) sql.
issue: https://github.com/apache/doris/issues/24221
## Further comments
If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1867246267
PR approved by at least one committer and no changes requested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427678062
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -74,30 +72,70 @@ public class InsertTask extends AbstractTask {
}
private String labelName;
Review Comment:
now use for getTvfInfo()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434217677
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+ private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ /**
+ * get jobs with label
+ * @param db db
+ * @return jobs
+ * @throws JobException e
+ */
+ public List<InsertJob> getJobs(Database db) throws JobException {
+ readLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+ if (labelToLoadJobs == null) {
+ throw new JobException("Load job does not exist");
+ }
+ return labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ } finally {
+ readUnlock();
+ }
+ }
+
+ /**
+ * add job with label
+ *
+ * @param job job with label
+ * @throws LabelAlreadyUsedException e
+ */
+ public void addJob(InsertJob job) throws LabelAlreadyUsedException {
+ writeLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs;
+ if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+ labelToLoadJobs = new ConcurrentHashMap<>();
+ dbIdToLabelToLoadJobs.put(job.getDbId(), labelToLoadJobs);
+ }
+ labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+ if (labelToLoadJobs.containsKey(job.getLabelName())) {
+ throw new LabelAlreadyUsedException(job.getLabelName());
+ } else {
+ labelToLoadJobs.put(job.getLabelName(), new ArrayList<>());
+ }
+ labelToLoadJobs.get(job.getLabelName()).add(job);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ /**
+ * support remove label job
+ * @param dbId db id
+ * @param labelName label name
+ */
+ public void removeJob(long dbId, String labelName) {
Review Comment:
TODO: for clean old label method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "morningman (via GitHub)" <gi...@apache.org>.
morningman merged PR #26356:
URL: https://github.com/apache/doris/pull/26356
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859747929
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859955145
(From new machine)TeamCity pipeline, clickbench performance test result:
the sum of best hot time: 43.64 seconds
stream load tsv: 579 seconds loaded 74807831229 Bytes, about 123 MB/s
stream load json: 19 seconds loaded 2358488459 Bytes, about 118 MB/s
stream load orc: 66 seconds loaded 1101869774 Bytes, about 15 MB/s
stream load parquet: 32 seconds loaded 861443392 Bytes, about 25 MB/s
insert into select: 28.6 seconds inserted 10000000 Rows, about 349K ops/s
storage size: 17219926919 Bytes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859653214
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1858708546
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859698163
(From new machine)TeamCity pipeline, clickbench performance test result:
the sum of best hot time: 44.84 seconds
stream load tsv: 576 seconds loaded 74807831229 Bytes, about 123 MB/s
stream load json: 19 seconds loaded 2358488459 Bytes, about 118 MB/s
stream load orc: 66 seconds loaded 1101869774 Bytes, about 15 MB/s
stream load parquet: 32 seconds loaded 861443392 Bytes, about 25 MB/s
insert into select: 28.9 seconds inserted 10000000 Rows, about 346K ops/s
storage size: 17216358140 Bytes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859853077
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859544368
(From new machine)TeamCity pipeline, clickbench performance test result:
the sum of best hot time: 44.14 seconds
stream load tsv: 575 seconds loaded 74807831229 Bytes, about 124 MB/s
stream load json: 19 seconds loaded 2358488459 Bytes, about 118 MB/s
stream load orc: 66 seconds loaded 1101869774 Bytes, about 15 MB/s
stream load parquet: 32 seconds loaded 861443392 Bytes, about 25 MB/s
insert into select: 28.4 seconds inserted 10000000 Rows, about 352K ops/s
storage size: 17220998870 Bytes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1860194357
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859845959
TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
```
Tpch sf100 test result on commit bfaf47d2d8d5afd2139a72fa8158f34d4871a719, data reload: false
run tpch-sf100 query with default conf and session variables
q1 4731 4468 4460 4460
q2 368 152 158 152
q3 1484 1270 1242 1242
q4 1123 904 880 880
q5 3124 3174 3183 3174
q6 244 127 130 127
q7 979 490 493 490
q8 2195 2223 2194 2194
q9 6722 6681 6666 6666
q10 3233 3255 3274 3255
q11 320 207 212 207
q12 351 211 209 209
q13 4541 3807 3809 3807
q14 240 207 219 207
q15 565 523 527 523
q16 443 378 385 378
q17 1025 569 460 460
q18 7161 6984 6918 6918
q19 1511 1446 1373 1373
q20 554 320 283 283
q21 3073 2643 2731 2643
q22 348 287 282 282
Total cold run time: 44335 ms
Total hot run time: 39930 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4413 4392 4371 4371
q2 269 160 172 160
q3 3561 3532 3523 3523
q4 2392 2381 2375 2375
q5 5735 5745 5754 5745
q6 244 121 125 121
q7 2413 1865 1858 1858
q8 3537 3513 3523 3513
q9 9041 9067 9004 9004
q10 3887 3980 4005 3980
q11 512 392 379 379
q12 758 611 592 592
q13 4302 3585 3543 3543
q14 292 260 248 248
q15 565 517 515 515
q16 501 473 471 471
q17 1888 1864 1880 1864
q18 8666 8190 8289 8190
q19 1717 1782 1783 1782
q20 2269 1966 1934 1934
q21 6545 6218 6166 6166
q22 484 442 424 424
Total cold run time: 63991 ms
Total hot run time: 60758 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859836459
(From new machine)TeamCity pipeline, clickbench performance test result:
the sum of best hot time: 43.98 seconds
stream load tsv: 575 seconds loaded 74807831229 Bytes, about 124 MB/s
stream load json: 19 seconds loaded 2358488459 Bytes, about 118 MB/s
stream load orc: 66 seconds loaded 1101869774 Bytes, about 15 MB/s
stream load parquet: 32 seconds loaded 861443392 Bytes, about 25 MB/s
insert into select: 28.7 seconds inserted 10000000 Rows, about 348K ops/s
storage size: 17219842626 Bytes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859975066
TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
```
Tpch sf100 test result on commit 1a3e98ff0a58621903f0c6728327143ef13ada75, data reload: false
run tpch-sf100 query with default conf and session variables
q1 4704 4455 4508 4455
q2 368 148 159 148
q3 1450 1235 1279 1235
q4 1112 884 899 884
q5 3131 3136 3132 3132
q6 243 125 127 125
q7 998 483 487 483
q8 2189 2234 2203 2203
q9 6691 6657 6688 6657
q10 3220 3278 3257 3257
q11 315 206 201 201
q12 349 208 206 206
q13 4555 3762 3792 3762
q14 247 217 208 208
q15 565 520 522 520
q16 438 381 379 379
q17 1016 607 537 537
q18 7236 6874 6933 6874
q19 1530 1479 1388 1388
q20 546 311 319 311
q21 3054 2676 2683 2676
q22 347 279 282 279
Total cold run time: 44304 ms
Total hot run time: 39920 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4431 4432 4393 4393
q2 264 164 170 164
q3 3538 3525 3525 3525
q4 2379 2371 2352 2352
q5 5736 5723 5734 5723
q6 241 121 123 121
q7 2384 1903 1867 1867
q8 3528 3529 3523 3523
q9 9005 8989 8956 8956
q10 3887 3997 3997 3997
q11 499 384 383 383
q12 767 600 599 599
q13 4281 3586 3547 3547
q14 280 258 245 245
q15 567 522 517 517
q16 499 442 467 442
q17 1883 1852 1846 1846
q18 8636 8408 8355 8355
q19 1726 1809 1794 1794
q20 2267 1952 1944 1944
q21 6534 6204 6179 6179
q22 496 430 435 430
Total cold run time: 63828 ms
Total hot run time: 60902 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1862073182
(From new machine)TeamCity pipeline, clickbench performance test result:
the sum of best hot time: 44.63 seconds
stream load tsv: 581 seconds loaded 74807831229 Bytes, about 122 MB/s
stream load json: 19 seconds loaded 2358488459 Bytes, about 118 MB/s
stream load orc: 66 seconds loaded 1101869774 Bytes, about 15 MB/s
stream load parquet: 33 seconds loaded 861443392 Bytes, about 24 MB/s
insert into select: 29.2 seconds inserted 10000000 Rows, about 342K ops/s
storage size: 17221068688 Bytes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868817492
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859385614
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427703299
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr {
+ private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+ private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+ private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+ // lock for export job
+ // lock is private and must use after db lock
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ private JobManager<InsertJob, ?> getJobManager() {
+ return Env.getCurrentEnv().getJobManager();
+ }
+
+ /**
+ * add load job and add tasks
+ * @param loadJob job
+ */
+ public void addLoadJob(InsertJob loadJob) throws DdlException {
+ writeLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(loadJob.getDbId());
+ if (labelToLoadJobs != null && labelToLoadJobs.containsKey(loadJob.getLabel())) {
+ throw new LabelAlreadyUsedException(loadJob.getLabel());
+ }
+ unprotectAddJob(loadJob);
+ } catch (LabelAlreadyUsedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void unprotectAddJob(InsertJob job) throws DdlException {
+ loadIdToJob.put(job.getJobId(), job);
+ try {
+ getJobManager().registerJob(job);
+ if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+ dbIdToLabelToLoadJobs.put(job.getDbId(), new ConcurrentHashMap<>());
+ }
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+ if (!labelToLoadJobs.containsKey(job.getLabel())) {
+ labelToLoadJobs.put(job.getLabel(), new ArrayList<>());
+ }
+ labelToLoadJobs.get(job.getLabel()).add(job);
+ } catch (org.apache.doris.job.exception.JobException e) {
+ throw new DdlException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * replay load job
+ * @param replayLoadLog load log
+ * @throws DdlException ex
+ */
+ public void replayLoadJob(ReplayLoadLog replayLoadLog) throws DdlException {
+ writeLock();
+ try {
+ if (replayLoadLog instanceof ReplayLoadLog.ReplayCreateLoadLog) {
+ InsertJob loadJob = new InsertJob((ReplayLoadLog.ReplayCreateLoadLog) replayLoadLog);
+ JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
+ jobConfig.setExecuteType(JobExecuteType.INSTANT);
+ loadJob.setJobConfig(jobConfig);
+ addLoadJob(loadJob);
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getJobId()).add("msg", "replay create load job")
+ .build());
+ } else if (replayLoadLog instanceof ReplayLoadLog.ReplayEndLoadLog) {
+ InsertJob job = loadIdToJob.get(replayLoadLog.getId());
+ if (job == null) {
+ // This should not happen.
+ // Last time I found that when user submit a job with already used label, an END_LOAD_JOB edit log
+ // will be written but the job is not added to 'idToLoadJob', so this job here we got will be null.
+ // And this bug has been fixed.
+ // Just add a log here to observe.
+ LOG.warn("job does not exist when replaying end load job edit log: {}", replayLoadLog);
+ return;
+ }
+ job.unprotectReadEndOperation((ReplayLoadLog.ReplayEndLoadLog) replayLoadLog);
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, replayLoadLog.getId()).add("operation", replayLoadLog)
+ .add("msg", "replay end load job").build());
+ } else {
+ throw new DdlException("Unsupported replay job type. ");
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ // public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+ // long scannedBytes, boolean isDone) {
+ // LoadJobExecutor job = loadIdToJob.get(jobId);
+ // if (job != null) {
+ // job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
+ // }
+ // }
+
+ /**
+ * cancel job
+ *
+ * @param dbName dbName
+ * @param label job label
+ * @param state job state
+ * @param operator filter operator, like or equals
+ */
+ public void cancelLoadJob(String dbName, String label, String state, CompoundPredicate.Operator operator)
+ throws JobException, AnalysisException, DdlException {
+ Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+ // List of load jobs waiting to be cancelled
+ List<InsertJob> uncompletedLoadJob;
+ readLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+ if (labelToLoadJobs == null) {
+ throw new JobException("Load job does not exist");
+ }
+ List<InsertJob> matchLoadJobs = Lists.newArrayList();
+ addNeedCancelLoadJob(label, state, operator,
+ labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+ matchLoadJobs);
+ if (matchLoadJobs.isEmpty()) {
+ throw new JobException("Load job does not exist");
+ }
+ // check state here
+ uncompletedLoadJob =
+ matchLoadJobs.stream().filter(InsertJob::isRunning)
+ .collect(Collectors.toList());
+ if (uncompletedLoadJob.isEmpty()) {
+ throw new JobException("There is no uncompleted job");
+ }
+ } finally {
+ readUnlock();
+ }
+ for (InsertJob loadJob : uncompletedLoadJob) {
+ try {
+ loadJob.cancelJob();
+ } catch (JobException e) {
+ LOG.warn("Fail to cancel job, its label: {}", loadJob.getLabel());
+ }
+ }
+ }
+
+ private static void addNeedCancelLoadJob(String label, String state,
Review Comment:
not required and I suggest make a method static if it's not use any instance fields for refactoring friendly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1858727499
(From new machine)TeamCity pipeline, clickbench performance test result:
the sum of best hot time: 45.22 seconds
stream load tsv: 589 seconds loaded 74807831229 Bytes, about 121 MB/s
stream load json: 19 seconds loaded 2358488459 Bytes, about 118 MB/s
stream load orc: 67 seconds loaded 1101869774 Bytes, about 15 MB/s
stream load parquet: 33 seconds loaded 861443392 Bytes, about 24 MB/s
insert into select: 28.9 seconds inserted 10000000 Rows, about 346K ops/s
storage size: 17219958200 Bytes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427678969
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -74,30 +72,70 @@ public class InsertTask extends AbstractTask {
}
private String labelName;
-
private InsertIntoTableCommand command;
-
private StmtExecutor stmtExecutor;
-
private ConnectContext ctx;
-
private String sql;
-
private String currentDb;
-
private UserIdentity userIdentity;
-
+ private LoadStatistic loadStatistic;
private AtomicBoolean isCanceled = new AtomicBoolean(false);
-
private AtomicBoolean isFinished = new AtomicBoolean(false);
-
private static final String LABEL_SPLITTER = "_";
+ private FailMsg failMsg;
+ @Getter
+ private String trackingUrl;
@Getter
@Setter
- private LoadJob loadJob;
+ private InsertJob jobInfo;
+ private TaskType taskType = TaskType.PENDING;
+ private MergeType mergeType = MergeType.APPEND;
+
+ /**
+ * task merge type
+ */
+ enum MergeType {
+ MERGE,
+ APPEND,
+ DELETE
+ }
+
+ /**
+ * task type
+ */
+ enum TaskType {
+ UNKNOWN, // this is only for ISSUE #2354
+ PENDING,
+ LOADING,
+ FINISHED,
+ FAILED,
+ CANCELLED
+ }
+
+ public InsertTask(InsertIntoTableCommand insertInto,
+ ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
+ this(null, insertInto, ctx, executor, statistic);
+ }
+
+ public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) {
+ this.labelName = labelName;
+ this.sql = sql;
+ this.currentDb = currentDb;
+ this.userIdentity = userIdentity;
+ setTaskId(Env.getCurrentEnv().getNextId());
+ }
+ public InsertTask(String labelName, InsertIntoTableCommand insertInto,
+ ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
+ this.labelName = labelName;
Review Comment:
the first constructor don't need loadStatistic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1858737446
<details>
<summary>TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'</summary>
```
Tpch sf100 test result on commit 09fcd7c63648ab365f94b9859fccc7812fb09b03, data reload: true
run tpch-sf100 query with default conf and session variables
q1 4650 4427 4413 4413
q2 381 120 118 118
q3 1456 1221 1204 1204
q4 1153 911 866 866
q5 3217 3228 3206 3206
q6 244 121 123 121
q7 964 526 542 526
q8 2155 2180 2189 2180
q9 7176 6692 6659 6659
q10 3219 3271 3265 3265
q11 324 202 201 201
q12 359 214 215 214
q13 4936 3851 3823 3823
q14 237 203 207 203
q15 566 507 518 507
q16 461 381 381 381
q17 1016 609 580 580
q18 7407 7035 6731 6731
q19 1555 1400 1451 1400
q20 501 284 290 284
q21 3059 2629 2659 2629
q22 344 271 277 271
Total cold run time: 45380 ms
Total hot run time: 39782 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4422 4397 4424 4397
q2 263 160 165 160
q3 3508 3492 3489 3489
q4 2391 2393 2391 2391
q5 5708 5697 5708 5697
q6 237 121 122 121
q7 2375 1851 1842 1842
q8 3519 3523 3518 3518
q9 9284 9027 9161 9027
q10 3917 4005 4023 4005
q11 504 385 394 385
q12 768 602 611 602
q13 4309 3572 3571 3571
q14 289 246 254 246
q15 585 528 526 526
q16 511 463 496 463
q17 1868 1866 1836 1836
q18 8517 8097 8337 8097
q19 1714 1765 1721 1721
q20 2274 1969 1937 1937
q21 6508 6240 6111 6111
q22 501 427 429 427
Total cold run time: 63972 ms
Total hot run time: 60569 ms
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "morningman (via GitHub)" <gi...@apache.org>.
morningman commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1424224847
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.DataTransFormMgr;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr extends DataTransFormMgr {
+ private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+ private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+ private Map<String, Long> labelToLoadJobId = new HashMap<>();
+ private Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+ private JobManager<InsertJob, ?> getJobManager() {
+ return Env.getCurrentEnv().getJobManager();
+ }
+
+ /**
+ * add load job and add tasks
+ * @param loadJob job
+ */
+ public void addLoadJob(InsertJob loadJob) throws DdlException {
+ writeLock();
+ try {
+ if (labelToLoadJobId.containsKey(loadJob.getLabel())) {
Review Comment:
Label is unique within a database.
Should use `dbIdToLabelToLoadJobs`
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -75,55 +118,147 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
COLUMN_TO_INDEX = builder.build();
}
- @SerializedName(value = "lp")
- String labelPrefix;
+ @SerializedName("taskIdList")
+ ConcurrentLinkedQueue<Long> taskIdList;
- InsertIntoTableCommand command;
+ private final long dbId;
+ private String labelName;
+ private List<InsertIntoTableCommand> plans;
+ private InsertJob.LoadType loadType;
+ // 0: the job status is pending
+ // n/100: n is the number of task which has been finished
+ // 99: all tasks have been finished
+ // 100: txn status is visible and load has been finished
+ private int progress;
+ private long createTimestamp = System.currentTimeMillis();
+ private FailMsg failMsg;
+ private LoadStatistic loadStatistic = new LoadStatistic();
+ private Set<Long> finishedTaskIds = new HashSet<>();
+ private Set<String> tableNames;
+ private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+ private Map<String, String> properties;
+ private AuthorizationInfo authorizationInfo;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private ConnectContext ctx;
+ private StmtExecutor stmtExecutor;
+ private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+ private List<TabletCommitInfo> commitInfos = new ArrayList<>();
- StmtExecutor stmtExecutor;
+ // max save task num, do we need to config it?
+ private static final int MAX_SAVE_TASK_NUM = 100;
- ConnectContext ctx;
+ /**
+ * load job type
+ */
+ public enum LoadType {
+ BULK,
+ SPARK,
+ LOCAL_FILE,
+ UNKNOWN
- @SerializedName("tis")
- ConcurrentLinkedQueue<Long> taskIdList;
+ }
- // max save task num, do we need to config it?
- private static final int MAX_SAVE_TASK_NUM = 100;
+ public InsertJob(ReplayLoadLog.ReplayCreateLoadLog replayLoadLog) {
+ super(replayLoadLog.getId());
+ setJobId(replayLoadLog.getId());
+ this.dbId = replayLoadLog.getDbId();
+ }
+
+ public InsertJob(Long jobId, String jobName,
+ JobStatus jobStatus,
+ String currentDbName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig,
+ Long createTimeMs,
+ String executeSql) {
+ super(jobId, jobName, jobStatus, currentDbName, comment, createUser,
Review Comment:
You use `label` as job's name.
But job's name need to be global unique.
So I think job name should be `dbid+label+uuid`
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.DataTransFormMgr;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr extends DataTransFormMgr {
+ private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+ private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+ private Map<String, Long> labelToLoadJobId = new HashMap<>();
+ private Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+ private JobManager<InsertJob, ?> getJobManager() {
+ return Env.getCurrentEnv().getJobManager();
+ }
+
+ /**
+ * add load job and add tasks
+ * @param loadJob job
+ */
+ public void addLoadJob(InsertJob loadJob) throws DdlException {
+ writeLock();
+ try {
+ if (labelToLoadJobId.containsKey(loadJob.getLabel())) {
+ throw new LabelAlreadyUsedException(loadJob.getLabel());
+ }
+ unprotectAddJob(loadJob);
+ Env.getCurrentEnv().getEditLog().logLoadCreate(ReplayLoadLog.logCreateLoadOperation(loadJob));
Review Comment:
In `unprotectAddJob->registerJob()`, there is an edit log.
And here is another editlog.
I think we should merge these 2 logs
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -75,55 +118,147 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
COLUMN_TO_INDEX = builder.build();
}
- @SerializedName(value = "lp")
- String labelPrefix;
+ @SerializedName("taskIdList")
+ ConcurrentLinkedQueue<Long> taskIdList;
- InsertIntoTableCommand command;
+ private final long dbId;
+ private String labelName;
+ private List<InsertIntoTableCommand> plans;
+ private InsertJob.LoadType loadType;
+ // 0: the job status is pending
+ // n/100: n is the number of task which has been finished
+ // 99: all tasks have been finished
+ // 100: txn status is visible and load has been finished
+ private int progress;
+ private long createTimestamp = System.currentTimeMillis();
+ private FailMsg failMsg;
+ private LoadStatistic loadStatistic = new LoadStatistic();
+ private Set<Long> finishedTaskIds = new HashSet<>();
+ private Set<String> tableNames;
+ private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+ private Map<String, String> properties;
+ private AuthorizationInfo authorizationInfo;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private ConnectContext ctx;
+ private StmtExecutor stmtExecutor;
+ private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+ private List<TabletCommitInfo> commitInfos = new ArrayList<>();
- StmtExecutor stmtExecutor;
+ // max save task num, do we need to config it?
+ private static final int MAX_SAVE_TASK_NUM = 100;
- ConnectContext ctx;
+ /**
+ * load job type
+ */
+ public enum LoadType {
+ BULK,
+ SPARK,
+ LOCAL_FILE,
+ UNKNOWN
- @SerializedName("tis")
- ConcurrentLinkedQueue<Long> taskIdList;
+ }
- // max save task num, do we need to config it?
- private static final int MAX_SAVE_TASK_NUM = 100;
+ public InsertJob(ReplayLoadLog.ReplayCreateLoadLog replayLoadLog) {
+ super(replayLoadLog.getId());
+ setJobId(replayLoadLog.getId());
+ this.dbId = replayLoadLog.getDbId();
+ }
+
+ public InsertJob(Long jobId, String jobName,
+ JobStatus jobStatus,
+ String currentDbName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig,
+ Long createTimeMs,
+ String executeSql) {
+ super(jobId, jobName, jobStatus, currentDbName, comment, createUser,
+ jobConfig, createTimeMs, executeSql, null);
+ this.dbId = ConnectContext.get().getCurrentDbId();
+ }
+
+ public InsertJob(ConnectContext ctx,
+ StmtExecutor executor,
+ String labelName,
+ List<InsertIntoTableCommand> plans,
+ Set<String> sinkTableNames,
+ Map<String, String> properties,
+ String comment,
+ JobExecutionConfiguration jobConfig) {
+ super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null,
+ comment, ctx.getCurrentUserIdentity(), jobConfig);
+ this.ctx = ctx;
+ this.plans = plans;
+ this.stmtExecutor = executor;
+ this.dbId = ctx.getCurrentDbId();
+ this.labelName = labelName;
+ this.tableNames = sinkTableNames;
+ this.properties = properties;
+ // TODO: not support other type yet
+ this.loadType = InsertJob.LoadType.BULK;
+ }
@Override
public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
- //nothing need to do in insert job
- InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
- task.setJobId(getJobId());
- task.setTaskType(taskType);
- task.setTaskId(Env.getCurrentEnv().getNextId());
+ if (plans.isEmpty()) {
+ InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
+ task.setJobId(getJobId());
+ task.setTaskType(taskType);
+ task.setTaskId(Env.getCurrentEnv().getNextId());
+ ArrayList<InsertTask> tasks = new ArrayList<>();
+ tasks.add(task);
+ super.initTasks(tasks);
+ addNewTask(task.getTaskId());
+ return tasks;
+ } else {
+ return createBatchTasks(taskType);
+ }
+ }
+
+ private List<InsertTask> createBatchTasks(TaskType taskType) {
ArrayList<InsertTask> tasks = new ArrayList<>();
- tasks.add(task);
- super.initTasks(tasks);
- addNewTask(task.getTaskId());
- return tasks;
+ for (InsertIntoTableCommand logicalPlan : plans) {
+ InsertTask task = new InsertTask(logicalPlan, ctx, stmtExecutor, loadStatistic);
+ task.setJobId(getJobId());
+ task.setTaskType(taskType);
+ idToTasks.put(task.getTaskId(), task);
+ initTasks(tasks);
+ }
+ return new ArrayList<>(idToTasks.values());
}
public void addNewTask(long id) {
Review Comment:
```suggestion
private void addNewTask(long id) {
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java:
##########
@@ -101,6 +105,14 @@ public void setJobId(long jobId) {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
+ runInternal(ctx, executor);
+ }
+
+ public void statefulRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
Review Comment:
Looks like `statefulRun()` is same as `run()`?
##########
fe/fe-core/src/main/java/org/apache/doris/load/DataTransFormMgr.java:
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public abstract class DataTransFormMgr {
Review Comment:
What is this class for?
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -75,55 +118,147 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
COLUMN_TO_INDEX = builder.build();
}
- @SerializedName(value = "lp")
- String labelPrefix;
+ @SerializedName("taskIdList")
+ ConcurrentLinkedQueue<Long> taskIdList;
- InsertIntoTableCommand command;
+ private final long dbId;
+ private String labelName;
+ private List<InsertIntoTableCommand> plans;
+ private InsertJob.LoadType loadType;
+ // 0: the job status is pending
+ // n/100: n is the number of task which has been finished
+ // 99: all tasks have been finished
+ // 100: txn status is visible and load has been finished
+ private int progress;
+ private long createTimestamp = System.currentTimeMillis();
+ private FailMsg failMsg;
+ private LoadStatistic loadStatistic = new LoadStatistic();
+ private Set<Long> finishedTaskIds = new HashSet<>();
+ private Set<String> tableNames;
+ private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+ private Map<String, String> properties;
+ private AuthorizationInfo authorizationInfo;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private ConnectContext ctx;
+ private StmtExecutor stmtExecutor;
+ private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+ private List<TabletCommitInfo> commitInfos = new ArrayList<>();
- StmtExecutor stmtExecutor;
+ // max save task num, do we need to config it?
+ private static final int MAX_SAVE_TASK_NUM = 100;
- ConnectContext ctx;
+ /**
+ * load job type
+ */
+ public enum LoadType {
+ BULK,
+ SPARK,
+ LOCAL_FILE,
+ UNKNOWN
- @SerializedName("tis")
- ConcurrentLinkedQueue<Long> taskIdList;
+ }
- // max save task num, do we need to config it?
- private static final int MAX_SAVE_TASK_NUM = 100;
+ public InsertJob(ReplayLoadLog.ReplayCreateLoadLog replayLoadLog) {
+ super(replayLoadLog.getId());
+ setJobId(replayLoadLog.getId());
+ this.dbId = replayLoadLog.getDbId();
+ }
+
+ public InsertJob(Long jobId, String jobName,
+ JobStatus jobStatus,
+ String currentDbName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig,
+ Long createTimeMs,
+ String executeSql) {
+ super(jobId, jobName, jobStatus, currentDbName, comment, createUser,
+ jobConfig, createTimeMs, executeSql, null);
+ this.dbId = ConnectContext.get().getCurrentDbId();
+ }
+
+ public InsertJob(ConnectContext ctx,
+ StmtExecutor executor,
+ String labelName,
+ List<InsertIntoTableCommand> plans,
+ Set<String> sinkTableNames,
+ Map<String, String> properties,
+ String comment,
+ JobExecutionConfiguration jobConfig) {
+ super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null,
+ comment, ctx.getCurrentUserIdentity(), jobConfig);
+ this.ctx = ctx;
+ this.plans = plans;
+ this.stmtExecutor = executor;
+ this.dbId = ctx.getCurrentDbId();
+ this.labelName = labelName;
+ this.tableNames = sinkTableNames;
+ this.properties = properties;
+ // TODO: not support other type yet
+ this.loadType = InsertJob.LoadType.BULK;
+ }
@Override
public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
- //nothing need to do in insert job
- InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
- task.setJobId(getJobId());
- task.setTaskType(taskType);
- task.setTaskId(Env.getCurrentEnv().getNextId());
+ if (plans.isEmpty()) {
+ InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
+ task.setJobId(getJobId());
+ task.setTaskType(taskType);
+ task.setTaskId(Env.getCurrentEnv().getNextId());
+ ArrayList<InsertTask> tasks = new ArrayList<>();
+ tasks.add(task);
+ super.initTasks(tasks);
+ addNewTask(task.getTaskId());
+ return tasks;
+ } else {
+ return createBatchTasks(taskType);
+ }
+ }
+
+ private List<InsertTask> createBatchTasks(TaskType taskType) {
Review Comment:
Why not call `addNewTask()` in `createBatchTasks()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1855388785
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1855426099
<details>
<summary>TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'</summary>
```
Tpch sf100 test result on commit c3ecbe8b5077a06b1ebdb7f378765228625ae569, data reload: false
run tpch-sf100 query with default conf and session variables
q1 4754 4480 4477 4477
q2 364 152 158 152
q3 1477 1259 1216 1216
q4 1113 903 911 903
q5 3141 3167 3189 3167
q6 247 130 129 129
q7 979 486 484 484
q8 2218 2194 2199 2194
q9 6682 6654 6646 6646
q10 3219 3273 3255 3255
q11 321 204 201 201
q12 351 211 212 211
q13 4557 3815 3786 3786
q14 249 211 211 211
q15 580 523 523 523
q16 440 387 387 387
q17 1008 595 528 528
q18 7174 6858 6870 6858
q19 1530 1378 1414 1378
q20 520 320 292 292
q21 3069 2627 2676 2627
q22 350 290 292 290
Total cold run time: 44343 ms
Total hot run time: 39915 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4398 4423 4388 4388
q2 268 165 168 165
q3 3532 3535 3520 3520
q4 2403 2386 2385 2385
q5 5744 5719 5776 5719
q6 240 121 125 121
q7 2406 1917 1836 1836
q8 3517 3525 3527 3525
q9 9038 9037 8981 8981
q10 3932 4003 4022 4003
q11 506 384 379 379
q12 769 592 594 592
q13 4280 3564 3549 3549
q14 284 256 260 256
q15 574 523 522 522
q16 502 466 461 461
q17 1866 1863 1865 1863
q18 8636 8267 8319 8267
q19 1757 1747 1745 1745
q20 2269 1968 1932 1932
q21 6515 6179 6179 6179
q22 494 414 428 414
Total cold run time: 63930 ms
Total hot run time: 60802 ms
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1858744908
run p0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859517070
TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
```
Tpch sf100 test result on commit db2cd09edf366d66eb55caae3f4fd6d5f0b49ea2, data reload: false
run tpch-sf100 query with default conf and session variables
q1 4749 4506 4497 4497
q2 363 151 159 151
q3 1463 1253 1198 1198
q4 1121 925 901 901
q5 3165 3166 3127 3127
q6 247 128 130 128
q7 986 486 475 475
q8 2219 2216 2187 2187
q9 6659 6650 6655 6650
q10 3201 3259 3249 3249
q11 329 203 209 203
q12 354 214 216 214
q13 4566 3800 3871 3800
q14 241 218 209 209
q15 570 537 524 524
q16 439 392 390 390
q17 1005 648 577 577
q18 7218 7034 6934 6934
q19 1552 1494 1363 1363
q20 537 319 300 300
q21 3040 2634 2664 2634
q22 346 283 281 281
Total cold run time: 44370 ms
Total hot run time: 39992 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4490 4396 4434 4396
q2 267 161 171 161
q3 3549 3527 3530 3527
q4 2392 2386 2382 2382
q5 5721 5740 5729 5729
q6 244 122 123 122
q7 2369 1849 1868 1849
q8 3520 3514 3512 3512
q9 9035 9007 9011 9007
q10 3904 3959 3996 3959
q11 506 392 386 386
q12 771 596 595 595
q13 4317 3546 3570 3546
q14 283 259 259 259
q15 568 518 525 518
q16 514 471 481 471
q17 1861 1872 1829 1829
q18 8648 8167 8298 8167
q19 1724 1758 1758 1758
q20 2254 1945 1937 1937
q21 6563 6211 6155 6155
q22 504 428 437 428
Total cold run time: 64004 ms
Total hot run time: 60693 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859516166
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859649578
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427690881
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));
+ private static final ShowResultSetMetaData TASK_META_DATA =
+ ShowResultSetMetaData.builder()
+ .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
+ .addColumn(new Column("Label", ScalarType.createVarchar(20)))
+ .addColumn(new Column("Status", ScalarType.createVarchar(20)))
+ .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
+ .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
+ .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
+
+ .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
+ .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
+ .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
+ .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
+ .addColumn(new Column("User", ScalarType.createVarchar(20)))
+ .build();
+
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
- ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
+ ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
- @SerializedName(value = "lp")
- String labelPrefix;
+ @SerializedName("taskIdList")
+ ConcurrentLinkedQueue<Long> taskIdList;
+ @SerializedName("dbId")
+ private final long dbId;
+ @SerializedName("labelName")
+ private String labelName;
+ @SerializedName("loadType")
+ private InsertJob.LoadType loadType;
+ // 0: the job status is pending
+ // n/100: n is the number of task which has been finished
+ // 99: all tasks have been finished
+ // 100: txn status is visible and load has been finished
+ @SerializedName("progress")
+ private int progress;
+ @SerializedName("failMsg")
+ private FailMsg failMsg;
+ @SerializedName("plans")
+ private List<InsertIntoTableCommand> plans;
+ private LoadStatistic loadStatistic = new LoadStatistic();
+ private Set<Long> finishedTaskIds = new HashSet<>();
+ private Set<String> tableNames;
+ private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+ private Map<String, String> properties;
+ private AuthorizationInfo authorizationInfo;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private ConnectContext ctx;
+ private StmtExecutor stmtExecutor;
+ private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+ private List<TabletCommitInfo> commitInfos = new ArrayList<>();
+
+ // max save task num, do we need to config it?
+ private static final int MAX_SAVE_TASK_NUM = 100;
+
+ /**
+ * load job type
+ */
+ public enum LoadType {
+ BULK,
+ SPARK,
+ LOCAL_FILE,
+ UNKNOWN
+
+ }
+
+ public enum Priority {
+ HIGH(0),
+ NORMAL(1),
+ LOW(2);
- InsertIntoTableCommand command;
+ Priority(int value) {
+ this.value = value;
+ }
- StmtExecutor stmtExecutor;
+ private final int value;
- ConnectContext ctx;
+ public int getValue() {
+ return value;
+ }
+ }
- @SerializedName("tis")
- ConcurrentLinkedQueue<Long> taskIdList;
+ public InsertJob(Long jobId, String jobName,
+ JobStatus jobStatus,
+ LabelName labelName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig,
+ Long createTimeMs,
+ String executeSql) {
+ super(jobId, jobName, jobStatus, labelName.getDbName(), comment, createUser,
+ jobConfig, createTimeMs, executeSql, null);
+ this.dbId = ConnectContext.get().getCurrentDbId();
+ this.labelName = labelName.getLabelName();
+ }
- // max save task num, do we need to config it?
- private static final int MAX_SAVE_TASK_NUM = 100;
+ public InsertJob(ConnectContext ctx,
+ StmtExecutor executor,
+ String labelName,
+ List<InsertIntoTableCommand> plans,
+ Set<String> sinkTableNames,
+ Map<String, String> properties,
+ String comment,
+ JobExecutionConfiguration jobConfig) {
+ super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null,
+ comment, ctx.getCurrentUserIdentity(), jobConfig);
+ this.ctx = ctx;
+ this.plans = plans;
+ this.stmtExecutor = executor;
+ this.dbId = ctx.getCurrentDbId();
+ this.labelName = labelName;
+ this.tableNames = sinkTableNames;
+ this.properties = properties;
+ // TODO: not support other type yet
+ this.loadType = InsertJob.LoadType.BULK;
+ }
@Override
- public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
- //nothing need to do in insert job
- InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
- task.setJobId(getJobId());
- task.setTaskType(taskType);
- task.setTaskId(Env.getCurrentEnv().getNextId());
- ArrayList<InsertTask> tasks = new ArrayList<>();
- tasks.add(task);
- super.initTasks(tasks);
- addNewTask(task.getTaskId());
- return tasks;
+ public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> taskContext) {
+ if (plans.isEmpty()) {
+ InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
+ task.setTaskType(taskType);
+ task.setJobId(getJobId());
+ task.setCreateTimeMs(System.currentTimeMillis());
+ task.setStatus(TaskStatus.PENDING);
+ ArrayList<InsertTask> tasks = new ArrayList<>();
+ tasks.add(task);
+ super.initTasks(tasks);
+ addNewTask(task.getTaskId());
+ return tasks;
+ } else {
+ return createBatchTasks(taskType);
+ }
}
- public void addNewTask(long id) {
+ private List<InsertTask> createBatchTasks(TaskType taskType) {
Review Comment:
refactor in the future, because there are many conflicts for this method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427703299
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr {
+ private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+ private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+ private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+ // lock for export job
+ // lock is private and must use after db lock
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ private JobManager<InsertJob, ?> getJobManager() {
+ return Env.getCurrentEnv().getJobManager();
+ }
+
+ /**
+ * add load job and add tasks
+ * @param loadJob job
+ */
+ public void addLoadJob(InsertJob loadJob) throws DdlException {
+ writeLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(loadJob.getDbId());
+ if (labelToLoadJobs != null && labelToLoadJobs.containsKey(loadJob.getLabel())) {
+ throw new LabelAlreadyUsedException(loadJob.getLabel());
+ }
+ unprotectAddJob(loadJob);
+ } catch (LabelAlreadyUsedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void unprotectAddJob(InsertJob job) throws DdlException {
+ loadIdToJob.put(job.getJobId(), job);
+ try {
+ getJobManager().registerJob(job);
+ if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+ dbIdToLabelToLoadJobs.put(job.getDbId(), new ConcurrentHashMap<>());
+ }
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+ if (!labelToLoadJobs.containsKey(job.getLabel())) {
+ labelToLoadJobs.put(job.getLabel(), new ArrayList<>());
+ }
+ labelToLoadJobs.get(job.getLabel()).add(job);
+ } catch (org.apache.doris.job.exception.JobException e) {
+ throw new DdlException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * replay load job
+ * @param replayLoadLog load log
+ * @throws DdlException ex
+ */
+ public void replayLoadJob(ReplayLoadLog replayLoadLog) throws DdlException {
+ writeLock();
+ try {
+ if (replayLoadLog instanceof ReplayLoadLog.ReplayCreateLoadLog) {
+ InsertJob loadJob = new InsertJob((ReplayLoadLog.ReplayCreateLoadLog) replayLoadLog);
+ JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
+ jobConfig.setExecuteType(JobExecuteType.INSTANT);
+ loadJob.setJobConfig(jobConfig);
+ addLoadJob(loadJob);
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getJobId()).add("msg", "replay create load job")
+ .build());
+ } else if (replayLoadLog instanceof ReplayLoadLog.ReplayEndLoadLog) {
+ InsertJob job = loadIdToJob.get(replayLoadLog.getId());
+ if (job == null) {
+ // This should not happen.
+ // Last time I found that when user submit a job with already used label, an END_LOAD_JOB edit log
+ // will be written but the job is not added to 'idToLoadJob', so this job here we got will be null.
+ // And this bug has been fixed.
+ // Just add a log here to observe.
+ LOG.warn("job does not exist when replaying end load job edit log: {}", replayLoadLog);
+ return;
+ }
+ job.unprotectReadEndOperation((ReplayLoadLog.ReplayEndLoadLog) replayLoadLog);
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, replayLoadLog.getId()).add("operation", replayLoadLog)
+ .add("msg", "replay end load job").build());
+ } else {
+ throw new DdlException("Unsupported replay job type. ");
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ // public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+ // long scannedBytes, boolean isDone) {
+ // LoadJobExecutor job = loadIdToJob.get(jobId);
+ // if (job != null) {
+ // job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
+ // }
+ // }
+
+ /**
+ * cancel job
+ *
+ * @param dbName dbName
+ * @param label job label
+ * @param state job state
+ * @param operator filter operator, like or equals
+ */
+ public void cancelLoadJob(String dbName, String label, String state, CompoundPredicate.Operator operator)
+ throws JobException, AnalysisException, DdlException {
+ Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+ // List of load jobs waiting to be cancelled
+ List<InsertJob> uncompletedLoadJob;
+ readLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+ if (labelToLoadJobs == null) {
+ throw new JobException("Load job does not exist");
+ }
+ List<InsertJob> matchLoadJobs = Lists.newArrayList();
+ addNeedCancelLoadJob(label, state, operator,
+ labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+ matchLoadJobs);
+ if (matchLoadJobs.isEmpty()) {
+ throw new JobException("Load job does not exist");
+ }
+ // check state here
+ uncompletedLoadJob =
+ matchLoadJobs.stream().filter(InsertJob::isRunning)
+ .collect(Collectors.toList());
+ if (uncompletedLoadJob.isEmpty()) {
+ throw new JobException("There is no uncompleted job");
+ }
+ } finally {
+ readUnlock();
+ }
+ for (InsertJob loadJob : uncompletedLoadJob) {
+ try {
+ loadJob.cancelJob();
+ } catch (JobException e) {
+ LOG.warn("Fail to cancel job, its label: {}", loadJob.getLabel());
+ }
+ }
+ }
+
+ private static void addNeedCancelLoadJob(String label, String state,
Review Comment:
not required and I suggest make a method static if it's not use any instance fields.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1862002157
run clickbench
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1862002061
run p0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "morningman (via GitHub)" <gi...@apache.org>.
morningman commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427583758
##########
fe/fe-common/src/main/java/org/apache/doris/common/Config.java:
##########
@@ -1899,6 +1899,14 @@ public class Config extends ConfigBase {
@ConfField(masterOnly = true)
public static boolean enable_hms_events_incremental_sync = false;
+ /**
+ * If set to true, doris will try to parse the ddl of a hive view and try to execute the query
+ * otherwise it will throw an AnalysisException.
+ */
+ @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
Review Comment:
add `description` field
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -54,35 +54,73 @@
@Log4j2
public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C>, Writable {
- @SerializedName(value = "jid")
+ @SerializedName(value = "jobId")
private Long jobId;
- @SerializedName(value = "jn")
+ @SerializedName(value = "jobName")
private String jobName;
- @SerializedName(value = "js")
+ @SerializedName(value = "jobStatus")
private JobStatus jobStatus;
- @SerializedName(value = "cdb")
+ @SerializedName(value = "currentDbName")
private String currentDbName;
- @SerializedName(value = "c")
+ @SerializedName(value = "comment")
private String comment;
- @SerializedName(value = "cu")
+ @SerializedName(value = "createUser")
private UserIdentity createUser;
- @SerializedName(value = "jc")
+ @SerializedName(value = "jobConfig")
private JobExecutionConfiguration jobConfig;
- @SerializedName(value = "ctms")
- private Long createTimeMs;
+ @SerializedName(value = "createTimeMs")
+ private Long createTimeMs = -1L;
+
+ @SerializedName(value = "startTimeMs")
+ private Long startTimeMs = -1L;
+
+ @SerializedName(value = "finishTimeMs")
+ private Long finishTimeMs = -1L;
@SerializedName(value = "sql")
String executeSql;
- @SerializedName(value = "ftm")
- private long finishTimeMs;
+ public AbstractJob() {}
+
+ public AbstractJob(Long id) {
+ setJobId(id);
+ }
+
+ public AbstractJob(Long jobId, String jobName, JobStatus jobStatus,
Review Comment:
Add comment to explain the different of this 2 constructors
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));
+ private static final ShowResultSetMetaData TASK_META_DATA =
+ ShowResultSetMetaData.builder()
+ .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
+ .addColumn(new Column("Label", ScalarType.createVarchar(20)))
+ .addColumn(new Column("Status", ScalarType.createVarchar(20)))
+ .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
+ .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
+ .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
+
+ .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
+ .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
+ .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
+ .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
+ .addColumn(new Column("User", ScalarType.createVarchar(20)))
+ .build();
+
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
- ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
+ ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
- @SerializedName(value = "lp")
- String labelPrefix;
+ @SerializedName("taskIdList")
+ ConcurrentLinkedQueue<Long> taskIdList;
+ @SerializedName("dbId")
+ private final long dbId;
+ @SerializedName("labelName")
+ private String labelName;
+ @SerializedName("loadType")
+ private InsertJob.LoadType loadType;
+ // 0: the job status is pending
+ // n/100: n is the number of task which has been finished
+ // 99: all tasks have been finished
+ // 100: txn status is visible and load has been finished
+ @SerializedName("progress")
+ private int progress;
+ @SerializedName("failMsg")
+ private FailMsg failMsg;
+ @SerializedName("plans")
+ private List<InsertIntoTableCommand> plans;
+ private LoadStatistic loadStatistic = new LoadStatistic();
+ private Set<Long> finishedTaskIds = new HashSet<>();
+ private Set<String> tableNames;
+ private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+ private Map<String, String> properties;
+ private AuthorizationInfo authorizationInfo;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private ConnectContext ctx;
+ private StmtExecutor stmtExecutor;
+ private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+ private List<TabletCommitInfo> commitInfos = new ArrayList<>();
+
+ // max save task num, do we need to config it?
+ private static final int MAX_SAVE_TASK_NUM = 100;
+
+ /**
+ * load job type
+ */
+ public enum LoadType {
+ BULK,
+ SPARK,
+ LOCAL_FILE,
+ UNKNOWN
+
+ }
+
+ public enum Priority {
+ HIGH(0),
+ NORMAL(1),
+ LOW(2);
- InsertIntoTableCommand command;
+ Priority(int value) {
+ this.value = value;
+ }
- StmtExecutor stmtExecutor;
+ private final int value;
- ConnectContext ctx;
+ public int getValue() {
+ return value;
+ }
+ }
- @SerializedName("tis")
- ConcurrentLinkedQueue<Long> taskIdList;
+ public InsertJob(Long jobId, String jobName,
+ JobStatus jobStatus,
+ LabelName labelName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig,
+ Long createTimeMs,
+ String executeSql) {
+ super(jobId, jobName, jobStatus, labelName.getDbName(), comment, createUser,
+ jobConfig, createTimeMs, executeSql, null);
+ this.dbId = ConnectContext.get().getCurrentDbId();
+ this.labelName = labelName.getLabelName();
+ }
- // max save task num, do we need to config it?
- private static final int MAX_SAVE_TASK_NUM = 100;
+ public InsertJob(ConnectContext ctx,
+ StmtExecutor executor,
+ String labelName,
+ List<InsertIntoTableCommand> plans,
+ Set<String> sinkTableNames,
+ Map<String, String> properties,
+ String comment,
+ JobExecutionConfiguration jobConfig) {
+ super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null,
Review Comment:
The generation of job id be both generated outside or inside.
In first constructor, it is passed from outside, in second constructor, is is generated inside.
We should unify them
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java:
##########
@@ -99,11 +99,34 @@ public interface Job<T extends AbstractTask, C> {
/**
* Cancels all running tasks of this job.
- *
* @throws JobException If cancelling a running task fails.
*/
void cancelAllTasks() throws JobException;
+ /**
+ * register job
+ * @throws JobException If register job failed.
+ */
+ void onRegister() throws JobException;
+
+ /**
+ * register job failed
+ * @throws JobException If failed.
+ */
+ void onRegisterFailed() throws JobException;
+
+ /**
+ * relay create job
+ * @throws JobException If replay create failed.
+ */
+ void onReplayCreate() throws JobException;
+
+ /**
+ * relay finished or cancelled job
Review Comment:
```suggestion
* replay finished or cancelled job
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -236,4 +290,135 @@ public T getJob(Long jobId) {
return jobMap.get(jobId);
}
+
+ /**
+ * get load info by db
+ *
+ * @param dbId db id
+ * @param dbName db name
+ * @param labelValue label name
+ * @param accurateMatch accurate match
+ * @param jobState state
+ * @return load infos
+ * @throws AnalysisException ex
+ */
+ public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName,
+ String labelValue,
+ boolean accurateMatch,
+ JobState jobState) throws AnalysisException {
+ LinkedList<List<Comparable>> loadJobInfos = new LinkedList<>();
+ if (!Env.getCurrentEnv().getLabelProcessor().existLabelJobs(dbId)) {
+ return loadJobInfos;
+ }
+ readLock();
+ try {
+ List<InsertJob> loadJobList = Env.getCurrentEnv().getLabelProcessor()
+ .filterLabelJobs(dbId, labelValue, accurateMatch);
+ // check state
+ for (InsertJob loadJob : loadJobList) {
+ try {
+ if (jobState != null && !validState(jobState, loadJob)) {
+ continue;
+ }
+ // add load job info, convert String list to Comparable list
+ loadJobInfos.add(new ArrayList<>(loadJob.getShowInfo()));
+ } catch (RuntimeException e) {
+ // ignore this load job
+ log.warn("get load job info failed. job id: {}", loadJob.getJobId(), e);
+ }
+ }
+ return loadJobInfos;
+ } finally {
+ readUnlock();
+ }
+ }
+
+ private static boolean validState(JobState jobState, InsertJob loadJob) {
+ JobStatus status = loadJob.getJobStatus();
+ switch (status) {
+ case RUNNING:
+ return jobState == JobState.PENDING || jobState == JobState.ETL
+ || jobState == JobState.LOADING || jobState == JobState.COMMITTED;
+ case STOPPED:
+ return jobState == JobState.CANCELLED;
+ case FINISHED:
+ return jobState == JobState.FINISHED;
+ default:
+ return false;
+ }
+ }
+
+ public void cancelLoadJob(CancelLoadStmt cs)
+ throws JobException, AnalysisException, DdlException {
+ String dbName = cs.getDbName();
+ String label = cs.getLabel();
+ String state = cs.getState();
+ CompoundPredicate.Operator operator = cs.getOperator();
+ Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+ // List of load jobs waiting to be cancelled
+ List<InsertJob> uncompletedLoadJob;
+ readLock();
+ try {
+ List<InsertJob> loadJobs = Env.getCurrentEnv().getLabelProcessor().getLabelJobs(db);
+ List<InsertJob> matchLoadJobs = Lists.newArrayList();
+ addNeedCancelLoadJob(label, state, operator, loadJobs, matchLoadJobs);
+ if (matchLoadJobs.isEmpty()) {
+ throw new JobException("Load job does not exist");
+ }
+ // check state here
+ uncompletedLoadJob =
Review Comment:
```suggestion
unfinishedLoadJob =
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+ private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+ /**
+ * getLabelJob
+ * @param db db
+ * @return label jobs
+ * @throws JobException e
+ */
+ public List<InsertJob> getLabelJobs(Database db) throws JobException {
Review Comment:
The method's name `getLabelJobs` and `addLabelJob` sounds strange.
How about just `getJobs` and `addJob`?
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -303,7 +348,19 @@ public ShowResultSetMetaData getJobMetaData() {
return builder.build();
}
- private static long getNextId() {
Review Comment:
Why changing the method of generating task id?
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -54,35 +54,73 @@
@Log4j2
public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C>, Writable {
- @SerializedName(value = "jid")
+ @SerializedName(value = "jobId")
Review Comment:
Do not change it.
We suggest to use abbr in future, to minimize the edit log size
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LoadMgr.java:
##########
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.analysis.CompoundPredicate;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.job.base.JobExecuteType;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.common.JobStatus;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+import org.apache.doris.job.manager.JobManager;
+import org.apache.doris.load.loadv2.JobState;
+import org.apache.doris.nereids.jobs.load.replay.ReplayLoadLog;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * load manager
+ */
+public class LoadMgr {
+ private static final Logger LOG = LogManager.getLogger(LoadMgr.class);
+ private Map<Long, InsertJob> loadIdToJob = new HashMap<>();
+ private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+ // lock for export job
+ // lock is private and must use after db lock
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ private JobManager<InsertJob, ?> getJobManager() {
+ return Env.getCurrentEnv().getJobManager();
+ }
+
+ /**
+ * add load job and add tasks
+ * @param loadJob job
+ */
+ public void addLoadJob(InsertJob loadJob) throws DdlException {
+ writeLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(loadJob.getDbId());
+ if (labelToLoadJobs != null && labelToLoadJobs.containsKey(loadJob.getLabel())) {
+ throw new LabelAlreadyUsedException(loadJob.getLabel());
+ }
+ unprotectAddJob(loadJob);
+ } catch (LabelAlreadyUsedException e) {
+ throw new RuntimeException(e);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void unprotectAddJob(InsertJob job) throws DdlException {
+ loadIdToJob.put(job.getJobId(), job);
+ try {
+ getJobManager().registerJob(job);
+ if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+ dbIdToLabelToLoadJobs.put(job.getDbId(), new ConcurrentHashMap<>());
+ }
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+ if (!labelToLoadJobs.containsKey(job.getLabel())) {
+ labelToLoadJobs.put(job.getLabel(), new ArrayList<>());
+ }
+ labelToLoadJobs.get(job.getLabel()).add(job);
+ } catch (org.apache.doris.job.exception.JobException e) {
+ throw new DdlException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * replay load job
+ * @param replayLoadLog load log
+ * @throws DdlException ex
+ */
+ public void replayLoadJob(ReplayLoadLog replayLoadLog) throws DdlException {
+ writeLock();
+ try {
+ if (replayLoadLog instanceof ReplayLoadLog.ReplayCreateLoadLog) {
+ InsertJob loadJob = new InsertJob((ReplayLoadLog.ReplayCreateLoadLog) replayLoadLog);
+ JobExecutionConfiguration jobConfig = new JobExecutionConfiguration();
+ jobConfig.setExecuteType(JobExecuteType.INSTANT);
+ loadJob.setJobConfig(jobConfig);
+ addLoadJob(loadJob);
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getJobId()).add("msg", "replay create load job")
+ .build());
+ } else if (replayLoadLog instanceof ReplayLoadLog.ReplayEndLoadLog) {
+ InsertJob job = loadIdToJob.get(replayLoadLog.getId());
+ if (job == null) {
+ // This should not happen.
+ // Last time I found that when user submit a job with already used label, an END_LOAD_JOB edit log
+ // will be written but the job is not added to 'idToLoadJob', so this job here we got will be null.
+ // And this bug has been fixed.
+ // Just add a log here to observe.
+ LOG.warn("job does not exist when replaying end load job edit log: {}", replayLoadLog);
+ return;
+ }
+ job.unprotectReadEndOperation((ReplayLoadLog.ReplayEndLoadLog) replayLoadLog);
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, replayLoadLog.getId()).add("operation", replayLoadLog)
+ .add("msg", "replay end load job").build());
+ } else {
+ throw new DdlException("Unsupported replay job type. ");
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ // public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+ // long scannedBytes, boolean isDone) {
+ // LoadJobExecutor job = loadIdToJob.get(jobId);
+ // if (job != null) {
+ // job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
+ // }
+ // }
+
+ /**
+ * cancel job
+ *
+ * @param dbName dbName
+ * @param label job label
+ * @param state job state
+ * @param operator filter operator, like or equals
+ */
+ public void cancelLoadJob(String dbName, String label, String state, CompoundPredicate.Operator operator)
+ throws JobException, AnalysisException, DdlException {
+ Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+ // List of load jobs waiting to be cancelled
+ List<InsertJob> uncompletedLoadJob;
+ readLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+ if (labelToLoadJobs == null) {
+ throw new JobException("Load job does not exist");
+ }
+ List<InsertJob> matchLoadJobs = Lists.newArrayList();
+ addNeedCancelLoadJob(label, state, operator,
+ labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
+ matchLoadJobs);
+ if (matchLoadJobs.isEmpty()) {
+ throw new JobException("Load job does not exist");
+ }
+ // check state here
+ uncompletedLoadJob =
+ matchLoadJobs.stream().filter(InsertJob::isRunning)
+ .collect(Collectors.toList());
+ if (uncompletedLoadJob.isEmpty()) {
+ throw new JobException("There is no uncompleted job");
+ }
+ } finally {
+ readUnlock();
+ }
+ for (InsertJob loadJob : uncompletedLoadJob) {
+ try {
+ loadJob.cancelJob();
+ } catch (JobException e) {
+ LOG.warn("Fail to cancel job, its label: {}", loadJob.getLabel());
+ }
+ }
+ }
+
+ private static void addNeedCancelLoadJob(String label, String state,
Review Comment:
```suggestion
private void addNeedCancelLoadJob(String label, String state,
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));
+ private static final ShowResultSetMetaData TASK_META_DATA =
+ ShowResultSetMetaData.builder()
+ .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
+ .addColumn(new Column("Label", ScalarType.createVarchar(20)))
+ .addColumn(new Column("Status", ScalarType.createVarchar(20)))
+ .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
+ .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
+ .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
+
+ .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
+ .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
+ .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
+ .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
+ .addColumn(new Column("User", ScalarType.createVarchar(20)))
+ .build();
+
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
- ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
+ ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
- @SerializedName(value = "lp")
- String labelPrefix;
+ @SerializedName("taskIdList")
+ ConcurrentLinkedQueue<Long> taskIdList;
+ @SerializedName("dbId")
+ private final long dbId;
+ @SerializedName("labelName")
+ private String labelName;
+ @SerializedName("loadType")
+ private InsertJob.LoadType loadType;
+ // 0: the job status is pending
+ // n/100: n is the number of task which has been finished
+ // 99: all tasks have been finished
+ // 100: txn status is visible and load has been finished
+ @SerializedName("progress")
Review Comment:
Use abbreviation as serialize name
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));
+ private static final ShowResultSetMetaData TASK_META_DATA =
+ ShowResultSetMetaData.builder()
+ .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
Review Comment:
Use String as Column Type, for all columns. Varchar 20 is not long enough
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -125,23 +273,30 @@ public void cancelTaskById(long taskId) throws JobException {
}
@Override
- public boolean isReadyForScheduling(Map taskContext) {
- return CollectionUtils.isEmpty(getRunningTasks());
+ public void cancelAllTasks() throws JobException {
+ writeLock();
Review Comment:
What is this lock protect?
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java:
##########
@@ -99,11 +99,34 @@ public interface Job<T extends AbstractTask, C> {
/**
* Cancels all running tasks of this job.
- *
* @throws JobException If cancelling a running task fails.
*/
void cancelAllTasks() throws JobException;
+ /**
+ * register job
+ * @throws JobException If register job failed.
+ */
+ void onRegister() throws JobException;
+
+ /**
+ * register job failed
+ * @throws JobException If failed.
+ */
+ void onRegisterFailed() throws JobException;
+
+ /**
+ * relay create job
Review Comment:
```suggestion
* replay create job
```
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -74,30 +72,70 @@ public class InsertTask extends AbstractTask {
}
private String labelName;
-
private InsertIntoTableCommand command;
-
private StmtExecutor stmtExecutor;
-
private ConnectContext ctx;
-
private String sql;
-
private String currentDb;
-
private UserIdentity userIdentity;
-
+ private LoadStatistic loadStatistic;
private AtomicBoolean isCanceled = new AtomicBoolean(false);
-
private AtomicBoolean isFinished = new AtomicBoolean(false);
-
private static final String LABEL_SPLITTER = "_";
+ private FailMsg failMsg;
+ @Getter
+ private String trackingUrl;
@Getter
@Setter
- private LoadJob loadJob;
+ private InsertJob jobInfo;
+ private TaskType taskType = TaskType.PENDING;
+ private MergeType mergeType = MergeType.APPEND;
+
+ /**
+ * task merge type
+ */
+ enum MergeType {
+ MERGE,
+ APPEND,
+ DELETE
+ }
+
+ /**
+ * task type
+ */
+ enum TaskType {
+ UNKNOWN, // this is only for ISSUE #2354
+ PENDING,
+ LOADING,
+ FINISHED,
+ FAILED,
+ CANCELLED
+ }
+
+ public InsertTask(InsertIntoTableCommand insertInto,
+ ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
+ this(null, insertInto, ctx, executor, statistic);
+ }
+
+ public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) {
+ this.labelName = labelName;
+ this.sql = sql;
+ this.currentDb = currentDb;
+ this.userIdentity = userIdentity;
+ setTaskId(Env.getCurrentEnv().getNextId());
+ }
+ public InsertTask(String labelName, InsertIntoTableCommand insertInto,
+ ConnectContext ctx, StmtExecutor executor, LoadStatistic statistic) {
+ this.labelName = labelName;
Review Comment:
Label is not used in `InsertTask`.
So I think we can merge this constructor and the first constructor.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -203,27 +351,273 @@ public void onTaskSuccess(InsertTask task) throws JobException {
@Override
public List<String> getShowInfo() {
- return super.getCommonShowInfo();
+ readLock();
+ try {
+ // check auth
+ checkAuth("SHOW LOAD");
+ List<String> jobInfo = Lists.newArrayList();
+ // jobId
+ jobInfo.add(getJobId().toString());
+ // label
+ jobInfo.add(getLabelName());
+ // state
+ jobInfo.add(getJobStatus().name());
+
+ // progress
+ String progress = Env.getCurrentProgressManager().getProgressInfo(String.valueOf(getJobId()));
+ switch (getJobStatus()) {
+ case RUNNING:
+ if (isPending()) {
+ jobInfo.add("ETL:0%; LOAD:0%");
+ } else {
+ jobInfo.add("ETL:100%; LOAD:" + progress + "%");
+ }
+ break;
+ case FINISHED:
+ jobInfo.add("ETL:100%; LOAD:100%");
+ break;
+ case STOPPED:
+ default:
+ jobInfo.add("ETL:N/A; LOAD:N/A");
+ break;
+ }
+ // type
+ jobInfo.add(loadType.name());
+
+ // etl info
+ if (loadStatistic.getCounters().size() == 0) {
+ jobInfo.add(FeConstants.null_string);
+ } else {
+ jobInfo.add(Joiner.on("; ").withKeyValueSeparator("=").join(loadStatistic.getCounters()));
+ }
+
+ // task info
+ jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + getTimeout()
+ + "; max_filter_ratio:" + getMaxFilterRatio() + "; priority:" + getPriority());
+ // error msg
+ if (failMsg == null) {
+ jobInfo.add(FeConstants.null_string);
+ } else {
+ jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg());
+ }
+
+ // create time
+ jobInfo.add(TimeUtils.longToTimeString(getCreateTimeMs()));
+ // etl start time
+ jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+ // etl end time
+ jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+ // load start time
+ jobInfo.add(TimeUtils.longToTimeString(getStartTimeMs()));
+ // load end time
+ jobInfo.add(TimeUtils.longToTimeString(getFinishTimeMs()));
+ // tracking urls
+ List<String> trackingUrl = idToTasks.values().stream()
+ .map(InsertTask::getTrackingUrl)
+ .collect(Collectors.toList());
+ if (trackingUrl.isEmpty()) {
+ jobInfo.add(FeConstants.null_string);
+ } else {
+ jobInfo.add(trackingUrl.toString());
+ }
+ // job details
+ jobInfo.add(loadStatistic.toJson());
+ // transaction id
+ jobInfo.add(String.valueOf(0));
+ // error tablets
+ jobInfo.add(errorTabletsToJson());
+ // user, some load job may not have user info
+ if (getCreateUser() == null || getCreateUser().getQualifiedUser() == null) {
+ jobInfo.add(FeConstants.null_string);
+ } else {
+ jobInfo.add(getCreateUser().getQualifiedUser());
+ }
+ // comment
+ jobInfo.add(getComment());
+ return jobInfo;
+ } catch (DdlException e) {
+ throw new RuntimeException(e);
+ } finally {
+ readUnlock();
+ }
+ }
+
+ private String getPriority() {
+ return properties.getOrDefault(LoadStmt.PRIORITY, Priority.NORMAL.name());
+ }
+
+ public double getMaxFilterRatio() {
+ return Double.parseDouble(properties.getOrDefault(LoadStmt.MAX_FILTER_RATIO_PROPERTY, "0.0"));
+ }
+
+ public long getTimeout() {
+ if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
+ return Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY));
+ }
+ return Config.broker_load_default_timeout_second;
+ }
+
+
+ public static InsertJob readFields(DataInput in) throws IOException {
+ String jsonJob = Text.readString(in);
+ InsertJob job = GsonUtils.GSON.fromJson(jsonJob, InsertJob.class);
+ job.setRunningTasks(new ArrayList<>());
+ return job;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
- private static final ShowResultSetMetaData TASK_META_DATA =
- ShowResultSetMetaData.builder()
- .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
- .addColumn(new Column("Label", ScalarType.createVarchar(20)))
- .addColumn(new Column("Status", ScalarType.createVarchar(20)))
- .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
- .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
- .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
+ public String errorTabletsToJson() {
+ Map<Long, String> map = new HashMap<>();
+ errorTabletInfos.stream().limit(Config.max_error_tablet_of_broker_load)
+ .forEach(p -> map.put(p.getTabletId(), p.getMsg()));
+ Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+ return gson.toJson(map);
+ }
- .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
- .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
- .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
- .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
- .addColumn(new Column("User", ScalarType.createVarchar(20)))
- .build();
+ public void updateLoadingStatus(Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
+ long scannedBytes, boolean isDone) {
+ loadStatistic.updateLoadProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
+ progress = (int) ((double) finishedTaskIds.size() / idToTasks.size() * 100);
+ if (progress == 100) {
+ progress = 99;
+ }
+ }
+
+ private void checkAuth(String command) throws DdlException {
+ if (authorizationInfo == null) {
+ // use the old method to check priv
+ checkAuthWithoutAuthInfo(command);
+ return;
+ }
+ if (!Env.getCurrentEnv().getAccessManager().checkPrivByAuthInfo(ConnectContext.get(), authorizationInfo,
+ PrivPredicate.LOAD)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
+ Privilege.LOAD_PRIV);
+ }
+ }
+
+ /**
+ * This method is compatible with old load job without authorization info
+ * If db or table name could not be found by id, it will throw the NOT_EXISTS_ERROR
+ *
+ * @throws DdlException
+ */
+ private void checkAuthWithoutAuthInfo(String command) throws DdlException {
+ Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
+ // check auth
+ if (tableNames.isEmpty()) {
+ // forward compatibility
+ if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), db.getFullName(),
+ PrivPredicate.LOAD)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
+ Privilege.LOAD_PRIV);
+ }
+ } else {
+ for (String tblName : tableNames) {
+ if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), db.getFullName(),
+ tblName, PrivPredicate.LOAD)) {
+ ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
+ command,
+ ConnectContext.get().getQualifiedUser(),
+ ConnectContext.get().getRemoteIP(), db.getFullName() + ": " + tblName);
+ }
+ }
+ }
+ }
+
+ public void unprotectReadEndOperation(InsertJob replayLog) {
+ setJobStatus(replayLog.getJobStatus());
+ progress = replayLog.getProgress();
+ setStartTimeMs(replayLog.getStartTimeMs());
+ setFinishTimeMs(replayLog.getFinishTimeMs());
+ failMsg = replayLog.getFailMsg();
+ }
+
+ public String getResourceName() {
+ // TODO: get tvf param from tvf relation
+ return LoadType.BULK.name();
+ }
+
+ public boolean isRunning() {
+ return getJobStatus() != JobStatus.FINISHED;
+ }
+
+ public boolean isPending() {
+ return getJobStatus() != JobStatus.FINISHED;
+ }
+
+ public boolean isCommitted() {
Review Comment:
remove unused method `isCommitted()` and `isFinished()`
##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -26,57 +34,105 @@
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.job.task.AbstractTask;
+import org.apache.doris.load.loadv2.JobState;
+import com.google.common.collect.Lists;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@Log4j2
public class JobManager<T extends AbstractJob<?, C>, C> implements Writable {
-
private final ConcurrentHashMap<Long, T> jobMap = new ConcurrentHashMap<>(32);
- private JobScheduler jobScheduler;
+ private JobScheduler<T, C> jobScheduler;
+
+ // lock for job
+ // lock is private and must use after db lock
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
public void start() {
- jobScheduler = new JobScheduler(jobMap);
+ jobScheduler = new JobScheduler<T, C>(jobMap);
jobScheduler.start();
}
+
+ /**
+ * get running job
+ *
+ * @param jobId id
+ * @return running job
+ */
+ public T getJob(long jobId) {
+ return jobMap.get(jobId);
+ }
+
public void registerJob(T job) throws JobException {
- job.checkJobParams();
- checkJobNameExist(job.getJobName());
- if (jobMap.get(job.getJobId()) != null) {
- throw new JobException("job id exist, jobId:" + job.getJobId());
+ writeLock();
+ try {
+ job.onRegister();
+ job.checkJobParams();
+ checkJobNameExist(job.getJobName());
+ if (jobMap.get(job.getJobId()) != null) {
+ throw new JobException("job id exist, jobId:" + job.getJobId());
+ }
+ jobMap.put(job.getJobId(), job);
+ //check its need to scheduler
+ jobScheduler.scheduleOneJob(job);
+ job.logCreateOperation();
+ } catch (JobException e) {
+ // job.onRegisterFailed();
Review Comment:
Why comment out this?
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java:
##########
@@ -65,58 +100,171 @@ public class InsertJob extends AbstractJob<InsertTask, Map> {
new Column("CreateTime", ScalarType.createStringType()),
new Column("Comment", ScalarType.createStringType()));
+ private static final ShowResultSetMetaData TASK_META_DATA =
+ ShowResultSetMetaData.builder()
+ .addColumn(new Column("TaskId", ScalarType.createVarchar(20)))
+ .addColumn(new Column("Label", ScalarType.createVarchar(20)))
+ .addColumn(new Column("Status", ScalarType.createVarchar(20)))
+ .addColumn(new Column("EtlInfo", ScalarType.createVarchar(20)))
+ .addColumn(new Column("TaskInfo", ScalarType.createVarchar(20)))
+ .addColumn(new Column("ErrorMsg", ScalarType.createVarchar(20)))
+
+ .addColumn(new Column("CreateTimeMs", ScalarType.createVarchar(20)))
+ .addColumn(new Column("FinishTimeMs", ScalarType.createVarchar(20)))
+ .addColumn(new Column("TrackingUrl", ScalarType.createVarchar(20)))
+ .addColumn(new Column("LoadStatistic", ScalarType.createVarchar(20)))
+ .addColumn(new Column("User", ScalarType.createVarchar(20)))
+ .build();
+
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
static {
- ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder();
+ ImmutableMap.Builder<String, Integer> builder = new ImmutableMap.Builder<>();
for (int i = 0; i < SCHEMA.size(); i++) {
builder.put(SCHEMA.get(i).getName().toLowerCase(), i);
}
COLUMN_TO_INDEX = builder.build();
}
- @SerializedName(value = "lp")
- String labelPrefix;
+ @SerializedName("taskIdList")
+ ConcurrentLinkedQueue<Long> taskIdList;
+ @SerializedName("dbId")
+ private final long dbId;
+ @SerializedName("labelName")
+ private String labelName;
+ @SerializedName("loadType")
+ private InsertJob.LoadType loadType;
+ // 0: the job status is pending
+ // n/100: n is the number of task which has been finished
+ // 99: all tasks have been finished
+ // 100: txn status is visible and load has been finished
+ @SerializedName("progress")
+ private int progress;
+ @SerializedName("failMsg")
+ private FailMsg failMsg;
+ @SerializedName("plans")
+ private List<InsertIntoTableCommand> plans;
+ private LoadStatistic loadStatistic = new LoadStatistic();
+ private Set<Long> finishedTaskIds = new HashSet<>();
+ private Set<String> tableNames;
+ private ConcurrentHashMap<Long, InsertTask> idToTasks = new ConcurrentHashMap<>();
+ private Map<String, String> properties;
+ private AuthorizationInfo authorizationInfo;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private ConnectContext ctx;
+ private StmtExecutor stmtExecutor;
+ private List<ErrorTabletInfo> errorTabletInfos = new ArrayList<>();
+ private List<TabletCommitInfo> commitInfos = new ArrayList<>();
+
+ // max save task num, do we need to config it?
+ private static final int MAX_SAVE_TASK_NUM = 100;
+
+ /**
+ * load job type
+ */
+ public enum LoadType {
+ BULK,
+ SPARK,
+ LOCAL_FILE,
+ UNKNOWN
+
+ }
+
+ public enum Priority {
+ HIGH(0),
+ NORMAL(1),
+ LOW(2);
- InsertIntoTableCommand command;
+ Priority(int value) {
+ this.value = value;
+ }
- StmtExecutor stmtExecutor;
+ private final int value;
- ConnectContext ctx;
+ public int getValue() {
+ return value;
+ }
+ }
- @SerializedName("tis")
- ConcurrentLinkedQueue<Long> taskIdList;
+ public InsertJob(Long jobId, String jobName,
+ JobStatus jobStatus,
+ LabelName labelName,
+ String comment,
+ UserIdentity createUser,
+ JobExecutionConfiguration jobConfig,
+ Long createTimeMs,
+ String executeSql) {
+ super(jobId, jobName, jobStatus, labelName.getDbName(), comment, createUser,
+ jobConfig, createTimeMs, executeSql, null);
+ this.dbId = ConnectContext.get().getCurrentDbId();
+ this.labelName = labelName.getLabelName();
+ }
- // max save task num, do we need to config it?
- private static final int MAX_SAVE_TASK_NUM = 100;
+ public InsertJob(ConnectContext ctx,
+ StmtExecutor executor,
+ String labelName,
+ List<InsertIntoTableCommand> plans,
+ Set<String> sinkTableNames,
+ Map<String, String> properties,
+ String comment,
+ JobExecutionConfiguration jobConfig) {
+ super(Env.getCurrentEnv().getNextId(), labelName, JobStatus.RUNNING, null,
+ comment, ctx.getCurrentUserIdentity(), jobConfig);
+ this.ctx = ctx;
+ this.plans = plans;
+ this.stmtExecutor = executor;
+ this.dbId = ctx.getCurrentDbId();
+ this.labelName = labelName;
+ this.tableNames = sinkTableNames;
+ this.properties = properties;
+ // TODO: not support other type yet
+ this.loadType = InsertJob.LoadType.BULK;
+ }
@Override
- public List<InsertTask> createTasks(TaskType taskType, Map taskContext) {
- //nothing need to do in insert job
- InsertTask task = new InsertTask(null, getCurrentDbName(), getExecuteSql(), getCreateUser());
- task.setJobId(getJobId());
- task.setTaskType(taskType);
- task.setTaskId(Env.getCurrentEnv().getNextId());
- ArrayList<InsertTask> tasks = new ArrayList<>();
- tasks.add(task);
- super.initTasks(tasks);
- addNewTask(task.getTaskId());
- return tasks;
+ public List<InsertTask> createTasks(TaskType taskType, Map<Object, Object> taskContext) {
+ if (plans.isEmpty()) {
+ InsertTask task = new InsertTask(labelName, getCurrentDbName(), getExecuteSql(), getCreateUser());
+ task.setTaskType(taskType);
+ task.setJobId(getJobId());
+ task.setCreateTimeMs(System.currentTimeMillis());
+ task.setStatus(TaskStatus.PENDING);
+ ArrayList<InsertTask> tasks = new ArrayList<>();
+ tasks.add(task);
+ super.initTasks(tasks);
+ addNewTask(task.getTaskId());
+ return tasks;
+ } else {
+ return createBatchTasks(taskType);
+ }
}
- public void addNewTask(long id) {
+ private List<InsertTask> createBatchTasks(TaskType taskType) {
Review Comment:
We should unify the `createBatchTasks()` and `createTasks()`.
For example, why there is `idToTasks.put(task.getTaskId(), task);` in `createBatchTasks()` but not in `createTasks()`?
##########
fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java:
##########
@@ -95,7 +151,7 @@ public void unregisterJob(String jobName) throws JobException {
public void alterJobStatus(Long jobId, JobStatus status) throws JobException {
Review Comment:
```suggestion
private void alterJobStatus(Long jobId, JobStatus status) throws JobException {
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+ private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
Review Comment:
This map should be protect by a lock.
Not for concurrency issue, but for atomic operation
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -74,30 +72,70 @@ public class InsertTask extends AbstractTask {
}
private String labelName;
Review Comment:
This field is unused
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java:
##########
@@ -130,22 +168,14 @@ protected TUniqueId generateQueryId(String taskIdString) {
return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
}
- public InsertTask(String labelName, String currentDb, String sql, UserIdentity userIdentity) {
- this.labelName = labelName;
- this.sql = sql;
- this.currentDb = currentDb;
- this.userIdentity = userIdentity;
-
- }
-
@Override
public void run() throws JobException {
try {
if (isCanceled.get()) {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
- command.run(ctx, stmtExecutor);
+ command.runWithStatistic(ctx, stmtExecutor, loadStatistic);
Review Comment:
`Statistic` always mean the statistic used for CBO.
Better change the name.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+ private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+
+ /**
+ * getLabelJob
+ * @param db db
+ * @return label jobs
+ * @throws JobException e
+ */
+ public List<InsertJob> getLabelJobs(Database db) throws JobException {
Review Comment:
Same issue for `filterLabelJobs`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1427652917
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -303,7 +348,19 @@ public ShowResultSetMetaData getJobMetaData() {
return builder.build();
}
- private static long getNextId() {
Review Comment:
we will generate task id in constructor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868842444
(From new machine)TeamCity pipeline, clickbench performance test result:
the sum of best hot time: 44.88 seconds
stream load tsv: 560 seconds loaded 74807831229 Bytes, about 127 MB/s
stream load json: 19 seconds loaded 2358488459 Bytes, about 118 MB/s
stream load orc: 66 seconds loaded 1101869774 Bytes, about 15 MB/s
stream load parquet: 32 seconds loaded 861443392 Bytes, about 25 MB/s
insert into select: 28.6 seconds inserted 10000000 Rows, about 349K ops/s
storage size: 17183712737 Bytes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868824661
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1851530308
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859543017
TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
```
Tpch sf100 test result on commit 77e5930ee90ed59db5101451e5dfac82810aa1c3, data reload: false
run tpch-sf100 query with default conf and session variables
q1 4756 4485 4572 4485
q2 362 159 159 159
q3 1455 1276 1198 1198
q4 1112 922 919 919
q5 3181 3168 3202 3168
q6 241 128 126 126
q7 1009 495 487 487
q8 2225 2217 2224 2217
q9 6744 6686 6699 6686
q10 3206 3260 3273 3260
q11 325 197 201 197
q12 354 210 211 210
q13 4581 3798 3783 3783
q14 242 210 218 210
q15 569 527 519 519
q16 444 390 385 385
q17 1025 658 594 594
q18 7192 6958 6996 6958
q19 1550 1491 1360 1360
q20 577 318 286 286
q21 3083 2668 2687 2668
q22 353 280 289 280
Total cold run time: 44586 ms
Total hot run time: 40155 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4470 4417 4445 4417
q2 266 162 165 162
q3 3545 3536 3528 3528
q4 2410 2395 2394 2394
q5 5744 5730 5751 5730
q6 241 120 121 120
q7 2397 1859 1870 1859
q8 3534 3545 3542 3542
q9 9000 9010 8967 8967
q10 3907 3953 4009 3953
q11 502 389 395 389
q12 764 578 584 578
q13 4307 3596 3555 3555
q14 294 257 255 255
q15 564 522 529 522
q16 520 470 480 470
q17 1887 1836 1850 1836
q18 8840 8342 8394 8342
q19 1741 1773 1768 1768
q20 2256 1940 1935 1935
q21 6526 6170 6171 6170
q22 499 427 435 427
Total cold run time: 64214 ms
Total hot run time: 60919 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1862002449
run pipelinex_p0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434742971
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
return createTasks(taskType, taskContext);
}
- public void initTasks(List<? extends AbstractTask> tasks) {
+ public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+ if (CollectionUtils.isEmpty(getRunningTasks())) {
+ runningTasks = new ArrayList<>();
+ }
tasks.forEach(task -> {
- task.setJobId(jobId);
- task.setTaskId(getNextId());
+ task.setTaskType(taskType);
+ task.setJobId(getJobId());
Review Comment:
set task id in task constructor, I think all these should put into its own constructor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859422672
TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
```
Tpch sf100 test result on commit db2cd09edf366d66eb55caae3f4fd6d5f0b49ea2, data reload: false
run tpch-sf100 query with default conf and session variables
q1 4729 4493 4491 4491
q2 362 151 158 151
q3 1458 1266 1218 1218
q4 1122 918 927 918
q5 3141 3136 3162 3136
q6 243 127 126 126
q7 983 481 481 481
q8 2240 2224 2187 2187
q9 6712 6685 6657 6657
q10 3226 3271 3279 3271
q11 312 203 203 203
q12 348 209 203 203
q13 4573 3786 3789 3786
q14 245 212 210 210
q15 564 521 524 521
q16 442 383 384 383
q17 1019 620 525 525
q18 7359 6845 6947 6845
q19 1531 1436 1412 1412
q20 557 317 296 296
q21 3048 2678 2684 2678
q22 345 280 279 279
Total cold run time: 44559 ms
Total hot run time: 39977 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4397 4398 4414 4398
q2 264 163 172 163
q3 3534 3527 3519 3519
q4 2383 2360 2374 2360
q5 5711 5735 5742 5735
q6 245 122 122 122
q7 2376 1889 1892 1889
q8 3520 3524 3525 3524
q9 9005 8992 9001 8992
q10 3927 4014 4012 4012
q11 522 392 374 374
q12 770 606 619 606
q13 4305 3567 3554 3554
q14 289 263 257 257
q15 573 524 522 522
q16 510 463 479 463
q17 1861 1873 1834 1834
q18 8652 8336 8332 8332
q19 1748 1760 1738 1738
q20 2257 1934 1932 1932
q21 6511 6157 6131 6131
q22 517 417 424 417
Total cold run time: 63877 ms
Total hot run time: 60874 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859705328
TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
```
Tpch sf100 test result on commit f962a67ed124134f9a3abef0f825939517ee7a04, data reload: false
run tpch-sf100 query with default conf and session variables
q1 4667 4442 4443 4442
q2 358 156 160 156
q3 1459 1236 1236 1236
q4 1098 870 882 870
q5 3086 3171 3179 3171
q6 245 128 127 127
q7 986 493 481 481
q8 2195 2200 2164 2164
q9 6706 6627 6672 6627
q10 3206 3278 3235 3235
q11 317 207 201 201
q12 344 211 210 210
q13 4591 3793 3835 3793
q14 238 216 217 216
q15 572 533 526 526
q16 442 384 386 384
q17 1015 664 553 553
q18 7132 6950 6971 6950
q19 1523 1441 1378 1378
q20 508 306 298 298
q21 3091 2615 2637 2615
q22 344 277 281 277
Total cold run time: 44123 ms
Total hot run time: 39910 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4415 4358 4368 4358
q2 266 161 171 161
q3 3531 3520 3511 3511
q4 2378 2364 2366 2364
q5 5722 5723 5708 5708
q6 240 121 120 120
q7 2381 1865 1853 1853
q8 3503 3504 3510 3504
q9 8935 8897 8976 8897
q10 3863 4003 3972 3972
q11 506 373 395 373
q12 768 597 602 597
q13 4279 3573 3575 3573
q14 280 259 258 258
q15 565 519 526 519
q16 500 475 467 467
q17 1861 1839 1839 1839
q18 8727 8337 8259 8259
q19 1716 1750 1755 1750
q20 2242 1949 1928 1928
q21 6505 6190 6184 6184
q22 506 419 420 419
Total cold run time: 63689 ms
Total hot run time: 60614 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1860237208
TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
```
Tpch sf100 test result on commit 6004f9eafd4d55c89a19d1fa519deb1d5cd9bb5d, data reload: false
run tpch-sf100 query with default conf and session variables
q1 4715 4459 4497 4459
q2 374 142 158 142
q3 1458 1268 1240 1240
q4 1113 909 931 909
q5 3123 3146 3174 3146
q6 248 124 129 124
q7 996 478 487 478
q8 2201 2228 2186 2186
q9 6720 6659 6687 6659
q10 3205 3291 3258 3258
q11 317 207 204 204
q12 357 209 204 204
q13 4549 3805 3817 3805
q14 242 213 213 213
q15 565 527 548 527
q16 438 388 384 384
q17 1019 691 581 581
q18 7157 6904 6945 6904
q19 1534 1419 1385 1385
q20 517 282 292 282
q21 3008 2633 2674 2633
q22 350 272 278 272
Total cold run time: 44206 ms
Total hot run time: 39995 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4406 4390 4392 4390
q2 266 162 172 162
q3 3538 3524 3536 3524
q4 2384 2378 2377 2377
q5 5734 5734 5701 5701
q6 240 123 120 120
q7 2385 1864 1900 1864
q8 3521 3538 3520 3520
q9 9040 9036 8981 8981
q10 3912 3976 3999 3976
q11 512 372 388 372
q12 755 586 603 586
q13 4280 3567 3582 3567
q14 291 257 260 257
q15 562 512 526 512
q16 497 461 446 446
q17 1870 1851 1876 1851
q18 8623 8277 8160 8160
q19 1742 1764 1746 1746
q20 2272 1942 1952 1942
q21 6524 6165 6183 6165
q22 498 417 415 415
Total cold run time: 63852 ms
Total hot run time: 60634 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "shuke987 (via GitHub)" <gi...@apache.org>.
shuke987 commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859500959
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1859394343
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434742971
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
return createTasks(taskType, taskContext);
}
- public void initTasks(List<? extends AbstractTask> tasks) {
+ public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+ if (CollectionUtils.isEmpty(getRunningTasks())) {
+ runningTasks = new ArrayList<>();
+ }
tasks.forEach(task -> {
- task.setJobId(jobId);
- task.setTaskId(getNextId());
+ task.setTaskType(taskType);
+ task.setJobId(getJobId());
Review Comment:
set task id in task constructor, I think all these should put into its own constructor, not set elsewhere
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "zddr (via GitHub)" <gi...@apache.org>.
zddr commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434740764
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
return createTasks(taskType, taskContext);
}
- public void initTasks(List<? extends AbstractTask> tasks) {
+ public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+ if (CollectionUtils.isEmpty(getRunningTasks())) {
+ runningTasks = new ArrayList<>();
+ }
tasks.forEach(task -> {
- task.setJobId(jobId);
- task.setTaskId(getNextId());
+ task.setTaskType(taskType);
+ task.setJobId(getJobId());
Review Comment:
why remove t`ask.setTaskId(getNextId());`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "zddr (via GitHub)" <gi...@apache.org>.
zddr commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434740764
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
return createTasks(taskType, taskContext);
}
- public void initTasks(List<? extends AbstractTask> tasks) {
+ public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+ if (CollectionUtils.isEmpty(getRunningTasks())) {
+ runningTasks = new ArrayList<>();
+ }
tasks.forEach(task -> {
- task.setJobId(jobId);
- task.setTaskId(getNextId());
+ task.setTaskType(taskType);
+ task.setJobId(getJobId());
Review Comment:
why remove `task.setTaskId(getNextId());`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434217621
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+ private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ /**
+ * get jobs with label
+ * @param db db
+ * @return jobs
+ * @throws JobException e
+ */
+ public List<InsertJob> getJobs(Database db) throws JobException {
+ readLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+ if (labelToLoadJobs == null) {
+ throw new JobException("Load job does not exist");
+ }
+ return labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ } finally {
+ readUnlock();
+ }
+ }
+
+ /**
+ * add job with label
+ *
+ * @param job job with label
+ * @throws LabelAlreadyUsedException e
+ */
+ public void addJob(InsertJob job) throws LabelAlreadyUsedException {
+ writeLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs;
+ if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+ labelToLoadJobs = new ConcurrentHashMap<>();
+ dbIdToLabelToLoadJobs.put(job.getDbId(), labelToLoadJobs);
+ }
+ labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+ if (labelToLoadJobs.containsKey(job.getLabelName())) {
+ throw new LabelAlreadyUsedException(job.getLabelName());
+ } else {
+ labelToLoadJobs.put(job.getLabelName(), new ArrayList<>());
+ }
+ labelToLoadJobs.get(job.getLabelName()).add(job);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ /**
+ * support remove label job
+ * @param dbId db id
+ * @param labelName label name
+ */
+ public void removeJob(long dbId, String labelName) {
Review Comment:
TODO: for clean old label method
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+ private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ /**
+ * get jobs with label
+ * @param db db
+ * @return jobs
+ * @throws JobException e
+ */
+ public List<InsertJob> getJobs(Database db) throws JobException {
+ readLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+ if (labelToLoadJobs == null) {
+ throw new JobException("Load job does not exist");
+ }
+ return labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ } finally {
+ readUnlock();
+ }
+ }
+
+ /**
+ * add job with label
+ *
+ * @param job job with label
+ * @throws LabelAlreadyUsedException e
+ */
+ public void addJob(InsertJob job) throws LabelAlreadyUsedException {
+ writeLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs;
+ if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+ labelToLoadJobs = new ConcurrentHashMap<>();
+ dbIdToLabelToLoadJobs.put(job.getDbId(), labelToLoadJobs);
+ }
+ labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+ if (labelToLoadJobs.containsKey(job.getLabelName())) {
+ throw new LabelAlreadyUsedException(job.getLabelName());
+ } else {
+ labelToLoadJobs.put(job.getLabelName(), new ArrayList<>());
+ }
+ labelToLoadJobs.get(job.getLabelName()).add(job);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ /**
+ * support remove label job
+ * @param dbId db id
+ * @param labelName label name
+ */
+ public void removeJob(long dbId, String labelName) {
Review Comment:
TODO: for clean old label method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "morningman (via GitHub)" <gi...@apache.org>.
morningman commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434185522
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/load/LabelProcessor.java:
##########
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.jobs.load;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.PatternMatcher;
+import org.apache.doris.common.PatternMatcherWrapper;
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertJob;
+
+import com.google.common.base.Strings;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * label manager
+ */
+public class LabelProcessor {
+ private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ private void readLock() {
+ lock.readLock().lock();
+ }
+
+ private void readUnlock() {
+ lock.readLock().unlock();
+ }
+
+ private void writeLock() {
+ lock.writeLock().lock();
+ }
+
+ private void writeUnlock() {
+ lock.writeLock().unlock();
+ }
+
+ /**
+ * get jobs with label
+ * @param db db
+ * @return jobs
+ * @throws JobException e
+ */
+ public List<InsertJob> getJobs(Database db) throws JobException {
+ readLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
+ if (labelToLoadJobs == null) {
+ throw new JobException("Load job does not exist");
+ }
+ return labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ } finally {
+ readUnlock();
+ }
+ }
+
+ /**
+ * add job with label
+ *
+ * @param job job with label
+ * @throws LabelAlreadyUsedException e
+ */
+ public void addJob(InsertJob job) throws LabelAlreadyUsedException {
+ writeLock();
+ try {
+ Map<String, List<InsertJob>> labelToLoadJobs;
+ if (!dbIdToLabelToLoadJobs.containsKey(job.getDbId())) {
+ labelToLoadJobs = new ConcurrentHashMap<>();
+ dbIdToLabelToLoadJobs.put(job.getDbId(), labelToLoadJobs);
+ }
+ labelToLoadJobs = dbIdToLabelToLoadJobs.get(job.getDbId());
+ if (labelToLoadJobs.containsKey(job.getLabelName())) {
+ throw new LabelAlreadyUsedException(job.getLabelName());
+ } else {
+ labelToLoadJobs.put(job.getLabelName(), new ArrayList<>());
+ }
+ labelToLoadJobs.get(job.getLabelName()).add(job);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ /**
+ * support remove label job
+ * @param dbId db id
+ * @param labelName label name
+ */
+ public void removeJob(long dbId, String labelName) {
Review Comment:
Not used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868799775
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1855435006
(From new machine)TeamCity pipeline, clickbench performance test result:
the sum of best hot time: 44.31 seconds
stream load tsv: 588 seconds loaded 74807831229 Bytes, about 121 MB/s
stream load json: 19 seconds loaded 2358488459 Bytes, about 118 MB/s
stream load orc: 67 seconds loaded 1101869774 Bytes, about 15 MB/s
stream load parquet: 33 seconds loaded 861443392 Bytes, about 24 MB/s
insert into select: 28.9 seconds inserted 10000000 Rows, about 346K ops/s
storage size: 17219723249 Bytes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1424866033
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java:
##########
@@ -101,6 +105,14 @@ public void setJobId(long jobId) {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
+ runInternal(ctx, executor);
+ }
+
+ public void statefulRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
Review Comment:
> Looks like `statefulRun()` is same as `run()`?
change to
void runWithStatistic(ConnectContext ctx, StmtExecutor executor, LoadStatistic loadStatistic);
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1851170290
run buildall
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434742971
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
return createTasks(taskType, taskContext);
}
- public void initTasks(List<? extends AbstractTask> tasks) {
+ public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+ if (CollectionUtils.isEmpty(getRunningTasks())) {
+ runningTasks = new ArrayList<>();
+ }
tasks.forEach(task -> {
- task.setJobId(jobId);
- task.setTaskId(getNextId());
+ task.setTaskType(taskType);
+ task.setJobId(getJobId());
Review Comment:
set task id in task constructor, I think all these should put into its own constructor, not put elsewhere
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "wsjz (via GitHub)" <gi...@apache.org>.
wsjz commented on code in PR #26356:
URL: https://github.com/apache/doris/pull/26356#discussion_r1434742971
##########
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java:
##########
@@ -154,17 +201,18 @@ public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
return createTasks(taskType, taskContext);
}
- public void initTasks(List<? extends AbstractTask> tasks) {
+ public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
+ if (CollectionUtils.isEmpty(getRunningTasks())) {
+ runningTasks = new ArrayList<>();
+ }
tasks.forEach(task -> {
- task.setJobId(jobId);
- task.setTaskId(getNextId());
+ task.setTaskType(taskType);
+ task.setJobId(getJobId());
Review Comment:
set task id in task constructor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1867246300
PR approved by anyone and no changes requested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868926386
PR approved by at least one committer and no changes requested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1868834531
TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
```
Tpch sf100 test result on commit be4617365b39cf5059ec0b65912908484d93fb66, data reload: false
run tpch-sf100 query with default conf and session variables
q1 4713 4409 4409 4409
q2 369 142 162 142
q3 1466 1278 1277 1277
q4 1101 852 867 852
q5 3169 3159 3182 3159
q6 255 129 131 129
q7 977 476 492 476
q8 2169 2223 2186 2186
q9 6707 6663 6662 6662
q10 3224 3268 3287 3268
q11 310 190 183 183
q12 359 210 207 207
q13 4554 3780 3790 3780
q14 234 209 211 209
q15 571 522 536 522
q16 446 388 392 388
q17 1022 663 575 575
q18 7068 6748 6872 6748
q19 1548 1414 1376 1376
q20 525 316 321 316
q21 3098 2675 2647 2647
q22 351 281 288 281
Total cold run time: 44236 ms
Total hot run time: 39792 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4368 4326 4313 4313
q2 273 175 176 175
q3 3519 3510 3525 3510
q4 2403 2382 2382 2382
q5 5714 5739 5739 5739
q6 246 123 126 123
q7 2358 1848 1858 1848
q8 3533 3553 3542 3542
q9 9047 9003 8979 8979
q10 3907 4007 3991 3991
q11 495 377 368 368
q12 771 582 617 582
q13 4301 3554 3554 3554
q14 285 248 247 247
q15 578 528 525 525
q16 485 442 462 442
q17 1892 1876 1867 1867
q18 8525 8144 8081 8081
q19 1761 1743 1740 1740
q20 2249 1962 1933 1933
q21 6508 6139 6173 6139
q22 518 445 397 397
Total cold run time: 63736 ms
Total hot run time: 60477 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org
Re: [PR] [feature](Load)(step2)support nereids load job schedule [doris]
Posted by "doris-robot (via GitHub)" <gi...@apache.org>.
doris-robot commented on PR #26356:
URL: https://github.com/apache/doris/pull/26356#issuecomment-1862091444
TPC-H test result on machine: 'aliyun_ecs.c7a.8xlarge_32C64G'
```
Tpch sf100 test result on commit 6004f9eafd4d55c89a19d1fa519deb1d5cd9bb5d, data reload: false
run tpch-sf100 query with default conf and session variables
q1 4646 4451 4465 4451
q2 359 152 160 152
q3 1475 1250 1202 1202
q4 1113 876 886 876
q5 3142 3130 3164 3130
q6 246 129 130 129
q7 996 499 481 481
q8 2166 2231 2177 2177
q9 6682 6630 6665 6630
q10 3218 3273 3286 3273
q11 309 177 189 177
q12 358 210 204 204
q13 4549 3785 3791 3785
q14 241 210 218 210
q15 573 524 520 520
q16 437 380 384 380
q17 1024 626 606 606
q18 7267 6947 6960 6947
q19 1577 1412 1372 1372
q20 501 304 286 286
q21 3042 2639 2695 2639
q22 360 280 286 280
Total cold run time: 44281 ms
Total hot run time: 39907 ms
run tpch-sf100 query with default conf and set session variable runtime_filter_mode=off
q1 4343 4362 4354 4354
q2 266 163 174 163
q3 3546 3529 3527 3527
q4 2386 2376 2360 2360
q5 5728 5771 5756 5756
q6 240 121 123 121
q7 2379 1903 1870 1870
q8 3523 3533 3536 3533
q9 9012 9017 8988 8988
q10 3915 4004 4025 4004
q11 492 380 369 369
q12 764 593 597 593
q13 4302 3578 3556 3556
q14 281 258 264 258
q15 567 519 527 519
q16 504 463 500 463
q17 1859 1866 1854 1854
q18 8740 8202 8387 8202
q19 1767 1807 1817 1807
q20 2259 1947 1942 1942
q21 6522 6206 6181 6181
q22 506 427 442 427
Total cold run time: 63901 ms
Total hot run time: 60847 ms
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org