You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/04 02:34:56 UTC

[kylin] 02/03: KYLIN-5369 Fix job is running, the job fails after the KE restart and failure information is displayed for several times

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit aaa1debe72f3f9ab915b54b4bb4c5d399842852d
Author: sibingzhang <74...@users.noreply.github.com>
AuthorDate: Tue Nov 1 15:51:43 2022 +0800

    KYLIN-5369 Fix job is running, the job fails after the KE restart and failure information is displayed for several times
---
 .../org/apache/kylin/rest/config/initialize/BootstrapCommand.java | 6 ++++--
 .../apache/kylin/rest/config/initialize/EpochChangedListener.java | 5 +++--
 .../org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java   | 8 ++++++++
 .../src/main/java/org/apache/kylin/job/runners/FetcherRunner.java | 8 ++++++++
 4 files changed, 23 insertions(+), 4 deletions(-)

diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java
index 955b59c3cb..f5a682bb8f 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java
@@ -29,6 +29,8 @@ import org.springframework.stereotype.Component;
 import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 @Slf4j
 @Component
 public class BootstrapCommand implements Runnable {
@@ -50,8 +52,8 @@ public class BootstrapCommand implements Runnable {
     }
 
     void initProject(KylinConfig config, final ProjectInstance project) {
+        NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project.getName());
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-            NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project.getName());
             scheduler.init(new JobEngineConfig(config));
             if (!scheduler.hasStarted()) {
                 throw new RuntimeException("Scheduler for " + project.getName() + " has not been started");
@@ -59,7 +61,7 @@ public class BootstrapCommand implements Runnable {
 
             return 0;
         }, project.getName(), 1, UnitOfWork.DEFAULT_EPOCH_ID);
-
+        scheduler.setHasFinishedTransactions(new AtomicBoolean(true));
         log.info("init project {} finished", project.getName());
     }
 }
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java
index e86435d6d3..6165322f40 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java
@@ -18,6 +18,7 @@
 package org.apache.kylin.rest.config.initialize;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -93,8 +94,8 @@ public class EpochChangedListener {
             }
 
             log.info("start thread of project: {}", project);
+            NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project);
             EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-                NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project);
                 scheduler.init(new JobEngineConfig(kylinConfig));
                 if (!scheduler.hasStarted()) {
                     throw new RuntimeException("Scheduler for " + project + " has not been started");
@@ -104,7 +105,6 @@ public class EpochChangedListener {
                 if (!ss.getHasStarted().get()) {
                     throw new RuntimeException("Streaming Scheduler for " + project + " has not been started");
                 }
-
                 QueryHistoryTaskScheduler qhAccelerateScheduler = QueryHistoryTaskScheduler.getInstance(project);
                 qhAccelerateScheduler.init();
 
@@ -115,6 +115,7 @@ public class EpochChangedListener {
                 recommendationUpdateScheduler.addProject(project);
                 return 0;
             }, project, 1);
+            scheduler.setHasFinishedTransactions(new AtomicBoolean(true));
         } else {
             //TODO need global leader
             CreateAdminUserUtils.createAllAdmins(userService, env);
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
index 8297b92239..d3efe67bb9 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import lombok.Setter;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ExecutorServiceUtil;
@@ -69,6 +70,9 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
     private ExecutableContext context;
     private AtomicBoolean initialized = new AtomicBoolean(false);
     private AtomicBoolean hasStarted = new AtomicBoolean(false);
+
+    @Setter
+    private AtomicBoolean hasFinishedTransactions = new AtomicBoolean(false);
     @Getter
     private JobEngineConfig jobEngineConfig;
     @Getter
@@ -226,6 +230,10 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
         return hasStarted.get();
     }
 
+    public boolean hasFinishedTransactions() {
+        return hasFinishedTransactions.get();
+    }
+
     public static double currentAvailableMem() {
         return 1.0 * memoryRemaining.availablePermits();
     }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
index 93f9d89bdc..ab659c48a8 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
@@ -45,6 +45,8 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
 
     private final ScheduledExecutorService fetcherPool;
 
+    private boolean reSchedule = true;
+
     public FetcherRunner(NDefaultScheduler nDefaultScheduler, ExecutorService jobPool,
             ScheduledExecutorService fetcherPool) {
         super(nDefaultScheduler);
@@ -95,6 +97,12 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
     @Override
     public void doRun() {
         try {
+            // Job schedule is only limited to the transaction in the NDefaultScheduler once
+            // Avoid that if the first transaction fails, jobs in the project cannot be scheduled
+            if (!nDefaultScheduler.hasFinishedTransactions() && reSchedule) {
+                reSchedule = false;
+                return;
+            }
             val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
             Map<String, Executable> runningJobs = context.getRunningJobs();