You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/14 13:56:37 UTC

[incubator-doris] 02/07: [fix](load) start transaction before we need it (#8819) (#8908)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 2fc7fead9344620dd9510b8155992a62e3f3de8b
Author: zhengshengjun <74...@users.noreply.github.com>
AuthorDate: Wed Apr 13 09:50:26 2022 +0800

    [fix](load) start transaction before we need it (#8819) (#8908)
---
 .../src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java   | 7 +++++++
 .../java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java    | 1 -
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 90c887b393..17c39b904a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -201,6 +201,13 @@ public class SparkLoadJob extends BulkLoadJob {
 
     @Override
     protected void unprotectedExecuteJob() throws LoadException {
+        try {
+            beginTxn();
+        } catch (UserException e) {
+            LOG.warn("failed to begin transaction for spark load job {}", id, e);
+            throw new LoadException(e.getMessage());
+        }
+
         // create pending task
         LoadTask task = new SparkLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(),
                                                  sparkResource, brokerDesc);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
index 85e123544a..d9f9467a3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -110,7 +110,6 @@ public class SparkLoadPendingTask extends LoadTask {
     @Override
     void executeTask() throws UserException {
         LOG.info("begin to execute spark pending task. load job id: {}", loadJobId);
-        ((SparkLoadJob) callback).beginTxn();
         submitEtlJob();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org