You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/04 02:34:56 UTC
[kylin] 02/03: KYLIN-5369 Fix job is running, the job fails after the KE restart and failure information is displayed for several times
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit aaa1debe72f3f9ab915b54b4bb4c5d399842852d
Author: sibingzhang <74...@users.noreply.github.com>
AuthorDate: Tue Nov 1 15:51:43 2022 +0800
KYLIN-5369 Fix job is running, the job fails after the KE restart and failure information is displayed for several times
---
.../org/apache/kylin/rest/config/initialize/BootstrapCommand.java | 6 ++++--
.../apache/kylin/rest/config/initialize/EpochChangedListener.java | 5 +++--
.../org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java | 8 ++++++++
.../src/main/java/org/apache/kylin/job/runners/FetcherRunner.java | 8 ++++++++
4 files changed, 23 insertions(+), 4 deletions(-)
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java
index 955b59c3cb..f5a682bb8f 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/BootstrapCommand.java
@@ -29,6 +29,8 @@ import org.springframework.stereotype.Component;
import lombok.val;
import lombok.extern.slf4j.Slf4j;
+import java.util.concurrent.atomic.AtomicBoolean;
+
@Slf4j
@Component
public class BootstrapCommand implements Runnable {
@@ -50,8 +52,8 @@ public class BootstrapCommand implements Runnable {
}
void initProject(KylinConfig config, final ProjectInstance project) {
+ NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project.getName());
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
- NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project.getName());
scheduler.init(new JobEngineConfig(config));
if (!scheduler.hasStarted()) {
throw new RuntimeException("Scheduler for " + project.getName() + " has not been started");
@@ -59,7 +61,7 @@ public class BootstrapCommand implements Runnable {
return 0;
}, project.getName(), 1, UnitOfWork.DEFAULT_EPOCH_ID);
-
+ scheduler.setHasFinishedTransactions(new AtomicBoolean(true));
log.info("init project {} finished", project.getName());
}
}
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java
index e86435d6d3..6165322f40 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/EpochChangedListener.java
@@ -18,6 +18,7 @@
package org.apache.kylin.rest.config.initialize;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
@@ -93,8 +94,8 @@ public class EpochChangedListener {
}
log.info("start thread of project: {}", project);
+ NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project);
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
- NDefaultScheduler scheduler = NDefaultScheduler.getInstance(project);
scheduler.init(new JobEngineConfig(kylinConfig));
if (!scheduler.hasStarted()) {
throw new RuntimeException("Scheduler for " + project + " has not been started");
@@ -104,7 +105,6 @@ public class EpochChangedListener {
if (!ss.getHasStarted().get()) {
throw new RuntimeException("Streaming Scheduler for " + project + " has not been started");
}
-
QueryHistoryTaskScheduler qhAccelerateScheduler = QueryHistoryTaskScheduler.getInstance(project);
qhAccelerateScheduler.init();
@@ -115,6 +115,7 @@ public class EpochChangedListener {
recommendationUpdateScheduler.addProject(project);
return 0;
}, project, 1);
+ scheduler.setHasFinishedTransactions(new AtomicBoolean(true));
} else {
//TODO need global leader
CreateAdminUserUtils.createAllAdmins(userService, env);
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
index 8297b92239..d3efe67bb9 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Setter;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ExecutorServiceUtil;
@@ -69,6 +70,9 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
private ExecutableContext context;
private AtomicBoolean initialized = new AtomicBoolean(false);
private AtomicBoolean hasStarted = new AtomicBoolean(false);
+
+ @Setter
+ private AtomicBoolean hasFinishedTransactions = new AtomicBoolean(false);
@Getter
private JobEngineConfig jobEngineConfig;
@Getter
@@ -226,6 +230,10 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
return hasStarted.get();
}
+ public boolean hasFinishedTransactions() {
+ return hasFinishedTransactions.get();
+ }
+
public static double currentAvailableMem() {
return 1.0 * memoryRemaining.availablePermits();
}
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
index 93f9d89bdc..ab659c48a8 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
@@ -45,6 +45,8 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
private final ScheduledExecutorService fetcherPool;
+ private boolean reSchedule = true;
+
public FetcherRunner(NDefaultScheduler nDefaultScheduler, ExecutorService jobPool,
ScheduledExecutorService fetcherPool) {
super(nDefaultScheduler);
@@ -95,6 +97,12 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
@Override
public void doRun() {
try {
+ // Job schedule is only limited to the transaction in the NDefaultScheduler once
+ // Avoid that if the first transaction fails, jobs in the project cannot be scheduled
+ if (!nDefaultScheduler.hasFinishedTransactions() && reSchedule) {
+ reSchedule = false;
+ return;
+ }
val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
Map<String, Executable> runningJobs = context.getRunningJobs();