You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/14 13:56:37 UTC
[incubator-doris] 02/07: [fix](load) start transaction before we need it (#8819) (#8908)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 2fc7fead9344620dd9510b8155992a62e3f3de8b
Author: zhengshengjun <74...@users.noreply.github.com>
AuthorDate: Wed Apr 13 09:50:26 2022 +0800
[fix](load) start transaction before we need it (#8819) (#8908)
---
.../src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java | 7 +++++++
.../java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java | 1 -
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 90c887b393..17c39b904a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -201,6 +201,13 @@ public class SparkLoadJob extends BulkLoadJob {
@Override
protected void unprotectedExecuteJob() throws LoadException {
+ try {
+ beginTxn();
+ } catch (UserException e) {
+ LOG.warn("failed to begin transaction for spark load job {}", id, e);
+ throw new LoadException(e.getMessage());
+ }
+
// create pending task
LoadTask task = new SparkLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(),
sparkResource, brokerDesc);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
index 85e123544a..d9f9467a3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -110,7 +110,6 @@ public class SparkLoadPendingTask extends LoadTask {
@Override
void executeTask() throws UserException {
LOG.info("begin to execute spark pending task. load job id: {}", loadJobId);
- ((SparkLoadJob) callback).beginTxn();
submitEtlJob();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org