You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/06 11:10:14 UTC
[kylin] 09/12: KYLIN-5349 Support project-level configuration of concurrent task limits
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5bca044690dabb78e18e86890d4283bd67b015ae
Author: Hang Jia <75...@qq.com>
AuthorDate: Thu Oct 27 10:57:59 2022 +0800
KYLIN-5349 Support project-level configuration of concurrent task limits
---
.../job/impl/threadpool/NDefaultScheduler.java | 19 ++++--
.../apache/kylin/job/runners/FetcherRunner.java | 27 +++++++-
.../job/impl/threadpool/NDefaultSchedulerTest.java | 79 +++++++++++++++++++++-
.../kylin/rest/service/ModelBuildService.java | 17 ++++-
.../kylin/rest/service/ModelServiceBuildTest.java | 46 +++++++++++++
5 files changed, 177 insertions(+), 11 deletions(-)
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
index d3efe67bb9..8be23b4c19 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
@@ -32,8 +32,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Setter;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.common.util.NamedThreadFactory;
+import org.apache.kylin.common.util.SystemInfoCollector;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
@@ -43,16 +45,17 @@ import org.apache.kylin.job.runners.FetcherRunner;
import org.apache.kylin.job.runners.JobCheckRunner;
import org.apache.kylin.job.runners.LicenseCapacityCheckRunner;
import org.apache.kylin.job.runners.QuotaStorageCheckRunner;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.common.util.SystemInfoCollector;
-import org.apache.kylin.metadata.epoch.EpochManager;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.kylin.metadata.epoch.EpochManager;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.val;
@@ -152,7 +155,7 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
//load all executable, set them to a consistent status
fetcherPool = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("FetchJobWorker(project:" + project + ")"));
- int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
+ int corePoolSize = getMaxConcurrentJobLimitByProject(config, jobEngineConfig, project);
if (config.getAutoSetConcurrentJob()) {
val availableMemoryRate = config.getMaxLocalConsumptionRatio();
synchronized (NDefaultScheduler.class) {
@@ -238,4 +241,12 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
return 1.0 * memoryRemaining.availablePermits();
}
+ public int getMaxConcurrentJobLimitByProject(KylinConfig config, JobEngineConfig jobEngineConfig, String project) {
+ ProjectInstance prjInstance = NProjectManager.getInstance(config).getProject(project);
+ if (Strings.isNullOrEmpty(project) || prjInstance == null) {
+ return jobEngineConfig.getMaxConcurrentJobLimit();
+ }
+ return prjInstance.getConfig().getMaxConcurrentJobLimit();
+ }
+
}
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
index ab659c48a8..425a228f74 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
@@ -20,6 +20,7 @@ package org.apache.kylin.job.runners;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.KylinConfig;
@@ -103,6 +104,8 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
reSchedule = false;
return;
}
+ checkAndUpdateJobPoolNum();
+
val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
Map<String, Executable> runningJobs = context.getRunningJobs();
@@ -236,7 +239,9 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
}
private boolean isJobPoolFull() {
- if (context.getRunningJobs().size() >= nDefaultScheduler.getJobEngineConfig().getMaxConcurrentJobLimit()) {
+ int corePoolSize = nDefaultScheduler.getMaxConcurrentJobLimitByProject(context.getConfig(),
+ nDefaultScheduler.getJobEngineConfig(), project);
+ if (context.getRunningJobs().size() >= corePoolSize) {
logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time.");
return true;
}
@@ -246,4 +251,24 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
void scheduleNext() {
fetcherPool.schedule(this, 0, TimeUnit.SECONDS);
}
+
+ private void checkAndUpdateJobPoolNum() {
+ final ThreadPoolExecutor pool = (ThreadPoolExecutor) jobPool;
+ int maximumPoolSize = pool.getMaximumPoolSize();
+ int maxConcurrentJobLimit = nDefaultScheduler.getMaxConcurrentJobLimitByProject(context.getConfig(),
+ nDefaultScheduler.getJobEngineConfig(), project);
+ int activeCount = pool.getActiveCount();
+ if (maximumPoolSize == maxConcurrentJobLimit) {
+ return;
+ }
+ if (maximumPoolSize < maxConcurrentJobLimit) {
+ pool.setCorePoolSize(maxConcurrentJobLimit);
+ pool.setMaximumPoolSize(maxConcurrentJobLimit);
+ return;
+ }
+ if (activeCount <= maxConcurrentJobLimit) {
+ pool.setCorePoolSize(maxConcurrentJobLimit);
+ pool.setMaximumPoolSize(maxConcurrentJobLimit);
+ }
+ }
}
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
index 77990e29ca..f6602d2cd1 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
@@ -71,10 +71,10 @@ import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
-import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.metadata.model.ManagementType;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.assertj.core.api.Assertions;
import org.awaitility.core.ConditionTimeoutException;
@@ -93,6 +93,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.kylin.metadata.epoch.EpochManager;
import lombok.val;
import lombok.var;
@@ -940,8 +941,7 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest {
Assert.assertEquals(RealizationStatusEnum.ONLINE, updateDf.getStatus());
}
- private DefaultExecutable testDataflowStatusWhenJobError(ManagementType tableOriented,
- JobTypeEnum indexBuild) {
+ private DefaultExecutable testDataflowStatusWhenJobError(ManagementType tableOriented, JobTypeEnum indexBuild) {
val dfMgr = NDataflowManager.getInstance(getTestConfig(), project);
val modelMgr = NDataModelManager.getInstance(getTestConfig(), project);
modelMgr.updateDataModel("89af4ee2-2cdb-4b07-b39e-4c29856309aa", copyForWrite -> {
@@ -2026,4 +2026,77 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest {
scheduler.getContext().setReachQuotaLimit(false);
}
}
+
+ @Test
+ @Repeat(3)
+ public void testProjectConcurrentJobLimit() {
+ String project = "heterogeneous_segment";
+ String modelId = "747f864b-9721-4b97-acde-0aa8e8656cba";
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ config.setProperty("kylin.job.max-concurrent-jobs", "1");
+ config.setProperty("kylin.engine.driver-memory-base", "512");
+
+ val scheduler = NDefaultScheduler.getInstance(project);
+ val originExecutableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+ val executableManager = Mockito.spy(originExecutableManager);
+ executableManager.deleteAllJob();
+ Mockito.doAnswer(invocation -> {
+ String jobId = invocation.getArgument(0);
+ originExecutableManager.destroyProcess(jobId);
+ return null;
+ }).when(executableManager).destroyProcess(Mockito.anyString());
+
+ scheduler.init(new JobEngineConfig(config));
+ val projectManager = NProjectManager.getInstance(config);
+
+ if (!scheduler.hasStarted()) {
+ throw new RuntimeException("scheduler has not been started");
+ }
+ int memory = NDefaultScheduler.getMemoryRemaining().availablePermits();
+ val df = NDataflowManager.getInstance(getTestConfig(), project).getDataflow(modelId);
+ val job1 = generateJob(df, project);
+ val job2 = generatePartial(df, project);
+ executableManager.addJob(job1);
+ executableManager.addJob(job2);
+ waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager);
+ Assert.assertNotEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits());
+ var runningExecutables = executableManager.getRunningExecutables(project, modelId);
+ runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+ Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+ Assert.assertEquals(ExecutableState.READY, runningExecutables.get(1).getStatus());
+
+ projectManager.getProject(project).getConfig().setProperty("kylin.job.max-concurrent-jobs", "2");
+ Assert.assertNotEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits());
+ val job3 = generateJob(df, project);
+ executableManager.addJob(job3);
+ waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager);
+ waitForJobByStatus(job2.getId(), 60000, ExecutableState.RUNNING, executableManager);
+ runningExecutables = executableManager.getRunningExecutables(project, modelId);
+ runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+ Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+ Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(1).getStatus());
+ Assert.assertEquals(ExecutableState.READY, runningExecutables.get(2).getStatus());
+
+ projectManager.getProject(project).getConfig().setProperty("kylin.job.max-concurrent-jobs", "1");
+ waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager);
+ waitForJobByStatus(job2.getId(), 60000, ExecutableState.RUNNING, executableManager);
+
+ runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+ Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+ Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(1).getStatus());
+ Assert.assertEquals(ExecutableState.READY, runningExecutables.get(2).getStatus());
+
+ waitForJobByStatus(job1.getId(), 60000, null, executableManager);
+ runningExecutables = executableManager.getRunningExecutables(project, modelId);
+ Assert.assertEquals(2, runningExecutables.size());
+ runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+ Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+ Assert.assertEquals(ExecutableState.READY, runningExecutables.get(1).getStatus());
+
+ scheduler.shutdown();
+ Assert.assertEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits());
+
+ Assert.assertEquals(1,
+ scheduler.getMaxConcurrentJobLimitByProject(config, scheduler.getJobEngineConfig(), "xxxxx"));
+ }
}
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
index 5548fd30d5..93724c8c26 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
@@ -66,6 +66,8 @@ import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.util.MultiPartitionUtil;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
import org.apache.kylin.query.util.PushDownUtil;
import org.apache.kylin.rest.aspect.Transaction;
@@ -85,6 +87,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -471,7 +474,7 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil
String segmentId, Set<Long> partitionIds, int priority, String yarnQueue, Object tag) {
val jobIds = Lists.<String> newArrayList();
if (parallelBuild) {
- checkConcurrentSubmit(partitionIds.size());
+ checkConcurrentSubmit(partitionIds.size(), project);
partitionIds.forEach(partitionId -> {
val jobParam = new JobParam(Sets.newHashSet(segmentId), null, modelId, getUsername(),
Sets.newHashSet(partitionId), null).withPriority(priority).withYarnQueue(yarnQueue)
@@ -490,14 +493,22 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil
return JobInfoResponse.of(jobIds, JobTypeEnum.SUB_PARTITION_BUILD.toString());
}
- private void checkConcurrentSubmit(int partitionSize) {
- int runningJobLimit = getConfig().getMaxConcurrentJobLimit();
+ private void checkConcurrentSubmit(int partitionSize, String project) {
+ int runningJobLimit = getMaxConcurrentJobLimitByProject(getConfig(), project);
int submitJobLimit = runningJobLimit * 5;
if (partitionSize > submitJobLimit) {
throw new KylinException(JOB_CONCURRENT_SUBMIT_LIMIT, submitJobLimit);
}
}
+ public int getMaxConcurrentJobLimitByProject(KylinConfig config, String project) {
+ ProjectInstance prjInstance = NProjectManager.getInstance(config).getProject(project);
+ if (Strings.isNullOrEmpty(project) || prjInstance == null) {
+ return config.getMaxConcurrentJobLimit();
+ }
+ return prjInstance.getConfig().getMaxConcurrentJobLimit();
+ }
+
@Override
@Transaction(project = 0)
public void refreshSegments(String project, String table, String refreshStart, String refreshEnd,
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
index 3dad56193f..6c9ea32d8b 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
@@ -39,6 +39,7 @@ import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
@@ -125,6 +126,7 @@ import org.mockito.Mockito;
import org.springframework.test.util.ReflectionTestUtils;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.val;
@@ -1647,4 +1649,48 @@ public class ModelServiceBuildTest extends SourceTestCase {
Assert.assertThrows(KylinException.class, () -> DateFormat.proposeDateFormat("not_exits"));
}
+ @Test
+ public void testGetMaxConcurrentJobLimitByProject() {
+ String project = getProject();
+ val modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
+ val segmentId = "ff839b0b-2c23-4420-b332-0df70e36c343";
+ val buildPartitions = Lists.<String[]> newArrayList();
+ buildPartitions.add(new String[] { "ASIA" });
+ buildPartitions.add(new String[] { "EUROPE" });
+ buildPartitions.add(new String[] { "MIDDLE EAST" });
+ buildPartitions.add(new String[] { "AMERICA" });
+ buildPartitions.add(new String[] { "MOROCCO" });
+ buildPartitions.add(new String[] { "INDONESIA" });
+
+ overwriteSystemProp("kylin.job.max-concurrent-jobs", "1");
+ Assert.assertEquals(1,
+ modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), project));
+ try {
+ modelBuildService.buildSegmentPartitionByValue(getProject(), modelId, segmentId, buildPartitions, true,
+ false, 0, null, null);
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof KylinException);
+ Assert.assertEquals(JOB_CONCURRENT_SUBMIT_LIMIT.getMsg(5), e.getMessage());
+ Assert.assertEquals(0, getRunningExecutables(getProject(), modelId).size());
+ }
+
+ val segmentId2 = "d2edf0c5-5eb2-4968-9ad5-09efbf659324";
+ Map<String, String> testOverrideP = Maps.newLinkedHashMap();
+ testOverrideP.put("kylin.job.max-concurrent-jobs", "2");
+ projectService.updateProjectConfig(project, testOverrideP);
+ Assert.assertEquals(2,
+ modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), project));
+ try {
+ modelBuildService.buildSegmentPartitionByValue(getProject(), modelId, segmentId2, buildPartitions, true,
+ false, 0, null, null);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ Assert.assertEquals(6, getRunningExecutables(getProject(), modelId).size());
+
+ Assert.assertEquals(1,
+ modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), "xxxxx"));
+ }
+
}