You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/02/14 05:54:46 UTC

[kylin] 17/33: KYLIN-5436 Fix restart build job failed

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ac3459718de13b189523fd272b91a300ad60814f
Author: sibingzhang <74...@users.noreply.github.com>
AuthorDate: Wed Dec 7 14:31:54 2022 +0800

    KYLIN-5436 Fix restart build job failed
---
 .../org/apache/kylin/job/execution/NExecutableManager.java     |  1 -
 .../main/java/org/apache/kylin/job/runners/FetcherRunner.java  | 10 +++-------
 .../src/main/java/org/apache/kylin/job/runners/JobRunner.java  |  1 +
 .../org/apache/kylin/job/execution/NExecutableManagerTest.java |  2 +-
 .../org/apache/kylin/engine/spark/job/NSparkExecutable.java    |  4 +++-
 5 files changed, 8 insertions(+), 10 deletions(-)

diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
index 5d62003967..512d891320 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java
@@ -1401,7 +1401,6 @@ public class NExecutableManager {
         if (thread != null) {
             logger.info("Interrupt Job [{}] thread and remove in ExecutableContext", executable.getDisplayName());
             thread.interrupt();
-            scheduler.getContext().removeRunningJob(executable);
         }
     }
 
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
index 425a228f74..e9e334ecbe 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
@@ -219,7 +219,6 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
             if (memoryLock) {
                 jobDesc = executable.toString();
                 logger.info("{} prepare to schedule", jobDesc);
-                context.addRunningJob(executable);
                 jobPool.execute(new JobRunner(nDefaultScheduler, executable, this));
                 logger.info("{} scheduled", jobDesc);
             } else {
@@ -227,12 +226,9 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
                         NDefaultScheduler.getMemoryRemaining().availablePermits(), executable.getDisplayName());
             }
         } catch (Exception ex) {
-            if (executable != null) {
-                context.removeRunningJob(executable);
-                if (memoryLock) {
-                    // may release twice when exception raise after jobPool execute executable
-                    NDefaultScheduler.getMemoryRemaining().release(useMemoryCapacity);
-                }
+            if (executable != null && memoryLock) {
+                // may release twice when exception raise after jobPool execute executable
+                NDefaultScheduler.getMemoryRemaining().release(useMemoryCapacity);
             }
             logger.warn("{} fail to schedule", jobDesc, ex);
         }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
index 3e839f57f4..74f4fbf365 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
@@ -50,6 +50,7 @@ public class JobRunner extends AbstractDefaultSchedulerRunner {
         val jobIdSimple = executable.getId().substring(0, 8);
         try (SetThreadName ignored = new SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple);
                 SetLogCategory logCategory = new SetLogCategory("schedule")) {
+            context.addRunningJob(executable);
             executable.execute(context);
             // trigger the next step asap
             fetcherRunner.scheduleNext();
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
index 8aa9bc85b5..8934603053 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/NExecutableManagerTest.java
@@ -907,7 +907,7 @@ public class NExecutableManagerTest extends NLocalFileMetadataTestCase {
         val env = getTestConfig().getDeployEnv();
         getTestConfig().setProperty("kylin.env", "PROD");
         manager.cancelJob(NExecutableManager.toPO(job, DEFAULT_PROJECT), job.getId());
-        Assertions.assertNull(scheduler.getContext().getRunningJobThread(job));
+        Assertions.assertNotNull(scheduler.getContext().getRunningJobThread(job));
         getTestConfig().setProperty("kylin.env", env);
         scheduler.shutdown();
     }
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 5b02c09393..325ac9524f 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -405,7 +405,8 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage
         return desc;
     }
 
-    protected ExecuteResult runSparkSubmit(String hadoopConfDir, String kylinJobJar, String appArgs) {
+    protected ExecuteResult runSparkSubmit(String hadoopConfDir, String kylinJobJar, String appArgs)
+            throws JobStoppedException {
         sparkJobHandler.killOrphanApplicationIfExists(project, getId(), getConfig(), true, getSparkConf());
         try {
             Object cmd;
@@ -432,6 +433,7 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage
             }
             return ExecuteResult.createSucceed(output);
         } catch (Exception e) {
+            checkNeedQuit(true);
             logger.warn("failed to execute spark submit command.");
             wrapWithExecuteExceptionUpdateJobError(e);
             return ExecuteResult.createError(e);