You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/06 11:10:14 UTC

[kylin] 09/12: KYLIN-5349 Support project-level configuration of concurrent task limits

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5bca044690dabb78e18e86890d4283bd67b015ae
Author: Hang Jia <75...@qq.com>
AuthorDate: Thu Oct 27 10:57:59 2022 +0800

    KYLIN-5349 Support project-level configuration of concurrent task limits
---
 .../job/impl/threadpool/NDefaultScheduler.java     | 19 ++++--
 .../apache/kylin/job/runners/FetcherRunner.java    | 27 +++++++-
 .../job/impl/threadpool/NDefaultSchedulerTest.java | 79 +++++++++++++++++++++-
 .../kylin/rest/service/ModelBuildService.java      | 17 ++++-
 .../kylin/rest/service/ModelServiceBuildTest.java  | 46 +++++++++++++
 5 files changed, 177 insertions(+), 11 deletions(-)

diff --git a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
index d3efe67bb9..8be23b4c19 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
@@ -32,8 +32,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Setter;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.ExecutorServiceUtil;
 import org.apache.kylin.common.util.NamedThreadFactory;
+import org.apache.kylin.common.util.SystemInfoCollector;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -43,16 +45,17 @@ import org.apache.kylin.job.runners.FetcherRunner;
 import org.apache.kylin.job.runners.JobCheckRunner;
 import org.apache.kylin.job.runners.LicenseCapacityCheckRunner;
 import org.apache.kylin.job.runners.QuotaStorageCheckRunner;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.common.util.SystemInfoCollector;
-import org.apache.kylin.metadata.epoch.EpochManager;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.kylin.metadata.epoch.EpochManager;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.val;
@@ -152,7 +155,7 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
         //load all executable, set them to a consistent status
         fetcherPool = Executors.newScheduledThreadPool(1,
                 new NamedThreadFactory("FetchJobWorker(project:" + project + ")"));
-        int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
+        int corePoolSize = getMaxConcurrentJobLimitByProject(config, jobEngineConfig, project);
         if (config.getAutoSetConcurrentJob()) {
             val availableMemoryRate = config.getMaxLocalConsumptionRatio();
             synchronized (NDefaultScheduler.class) {
@@ -238,4 +241,12 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
         return 1.0 * memoryRemaining.availablePermits();
     }
 
+    public int getMaxConcurrentJobLimitByProject(KylinConfig config, JobEngineConfig jobEngineConfig, String project) {
+        ProjectInstance prjInstance = NProjectManager.getInstance(config).getProject(project);
+        if (Strings.isNullOrEmpty(project) || prjInstance == null) {
+            return jobEngineConfig.getMaxConcurrentJobLimit();
+        }
+        return prjInstance.getConfig().getMaxConcurrentJobLimit();
+    }
+
 }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
index ab659c48a8..425a228f74 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
@@ -20,6 +20,7 @@ package org.apache.kylin.job.runners;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kylin.common.KylinConfig;
@@ -103,6 +104,8 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
                 reSchedule = false;
                 return;
             }
+            checkAndUpdateJobPoolNum();
+
             val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
             Map<String, Executable> runningJobs = context.getRunningJobs();
 
@@ -236,7 +239,9 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
     }
 
     private boolean isJobPoolFull() {
-        if (context.getRunningJobs().size() >= nDefaultScheduler.getJobEngineConfig().getMaxConcurrentJobLimit()) {
+        int corePoolSize = nDefaultScheduler.getMaxConcurrentJobLimitByProject(context.getConfig(),
+                nDefaultScheduler.getJobEngineConfig(), project);
+        if (context.getRunningJobs().size() >= corePoolSize) {
             logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time.");
             return true;
         }
@@ -246,4 +251,24 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
     void scheduleNext() {
         fetcherPool.schedule(this, 0, TimeUnit.SECONDS);
     }
+
+    private void checkAndUpdateJobPoolNum() {
+        final ThreadPoolExecutor pool = (ThreadPoolExecutor) jobPool;
+        int maximumPoolSize = pool.getMaximumPoolSize();
+        int maxConcurrentJobLimit = nDefaultScheduler.getMaxConcurrentJobLimitByProject(context.getConfig(),
+                nDefaultScheduler.getJobEngineConfig(), project);
+        int activeCount = pool.getActiveCount();
+        if (maximumPoolSize == maxConcurrentJobLimit) {
+            return;
+        }
+        if (maximumPoolSize < maxConcurrentJobLimit) {
+            pool.setCorePoolSize(maxConcurrentJobLimit);
+            pool.setMaximumPoolSize(maxConcurrentJobLimit);
+            return;
+        }
+        if (activeCount <= maxConcurrentJobLimit) {
+            pool.setCorePoolSize(maxConcurrentJobLimit);
+            pool.setMaximumPoolSize(maxConcurrentJobLimit);
+        }
+    }
 }
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
index 77990e29ca..f6602d2cd1 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
@@ -71,10 +71,10 @@ import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
-import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.model.ManagementType;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.assertj.core.api.Assertions;
 import org.awaitility.core.ConditionTimeoutException;
@@ -93,6 +93,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import org.apache.kylin.metadata.epoch.EpochManager;
 import lombok.val;
 import lombok.var;
 
@@ -940,8 +941,7 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest {
         Assert.assertEquals(RealizationStatusEnum.ONLINE, updateDf.getStatus());
     }
 
-    private DefaultExecutable testDataflowStatusWhenJobError(ManagementType tableOriented,
-            JobTypeEnum indexBuild) {
+    private DefaultExecutable testDataflowStatusWhenJobError(ManagementType tableOriented, JobTypeEnum indexBuild) {
         val dfMgr = NDataflowManager.getInstance(getTestConfig(), project);
         val modelMgr = NDataModelManager.getInstance(getTestConfig(), project);
         modelMgr.updateDataModel("89af4ee2-2cdb-4b07-b39e-4c29856309aa", copyForWrite -> {
@@ -2026,4 +2026,77 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest {
             scheduler.getContext().setReachQuotaLimit(false);
         }
     }
+
+    @Test
+    @Repeat(3)
+    public void testProjectConcurrentJobLimit() {
+        String project = "heterogeneous_segment";
+        String modelId = "747f864b-9721-4b97-acde-0aa8e8656cba";
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setProperty("kylin.job.max-concurrent-jobs", "1");
+        config.setProperty("kylin.engine.driver-memory-base", "512");
+
+        val scheduler = NDefaultScheduler.getInstance(project);
+        val originExecutableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        val executableManager = Mockito.spy(originExecutableManager);
+        executableManager.deleteAllJob();
+        Mockito.doAnswer(invocation -> {
+            String jobId = invocation.getArgument(0);
+            originExecutableManager.destroyProcess(jobId);
+            return null;
+        }).when(executableManager).destroyProcess(Mockito.anyString());
+
+        scheduler.init(new JobEngineConfig(config));
+        val projectManager = NProjectManager.getInstance(config);
+
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        int memory = NDefaultScheduler.getMemoryRemaining().availablePermits();
+        val df = NDataflowManager.getInstance(getTestConfig(), project).getDataflow(modelId);
+        val job1 = generateJob(df, project);
+        val job2 = generatePartial(df, project);
+        executableManager.addJob(job1);
+        executableManager.addJob(job2);
+        waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager);
+        Assert.assertNotEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits());
+        var runningExecutables = executableManager.getRunningExecutables(project, modelId);
+        runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+        Assert.assertEquals(ExecutableState.READY, runningExecutables.get(1).getStatus());
+
+        projectManager.getProject(project).getConfig().setProperty("kylin.job.max-concurrent-jobs", "2");
+        Assert.assertNotEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits());
+        val job3 = generateJob(df, project);
+        executableManager.addJob(job3);
+        waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager);
+        waitForJobByStatus(job2.getId(), 60000, ExecutableState.RUNNING, executableManager);
+        runningExecutables = executableManager.getRunningExecutables(project, modelId);
+        runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(1).getStatus());
+        Assert.assertEquals(ExecutableState.READY, runningExecutables.get(2).getStatus());
+
+        projectManager.getProject(project).getConfig().setProperty("kylin.job.max-concurrent-jobs", "1");
+        waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager);
+        waitForJobByStatus(job2.getId(), 60000, ExecutableState.RUNNING, executableManager);
+
+        runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(1).getStatus());
+        Assert.assertEquals(ExecutableState.READY, runningExecutables.get(2).getStatus());
+
+        waitForJobByStatus(job1.getId(), 60000, null, executableManager);
+        runningExecutables = executableManager.getRunningExecutables(project, modelId);
+        Assert.assertEquals(2, runningExecutables.size());
+        runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+        Assert.assertEquals(ExecutableState.READY, runningExecutables.get(1).getStatus());
+
+        scheduler.shutdown();
+        Assert.assertEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits());
+
+        Assert.assertEquals(1,
+                scheduler.getMaxConcurrentJobLimitByProject(config, scheduler.getJobEngineConfig(), "xxxxx"));
+    }
 }
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
index 5548fd30d5..93724c8c26 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
@@ -66,6 +66,8 @@ import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.util.MultiPartitionUtil;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.kylin.rest.aspect.Transaction;
@@ -85,6 +87,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -471,7 +474,7 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil
             String segmentId, Set<Long> partitionIds, int priority, String yarnQueue, Object tag) {
         val jobIds = Lists.<String> newArrayList();
         if (parallelBuild) {
-            checkConcurrentSubmit(partitionIds.size());
+            checkConcurrentSubmit(partitionIds.size(), project);
             partitionIds.forEach(partitionId -> {
                 val jobParam = new JobParam(Sets.newHashSet(segmentId), null, modelId, getUsername(),
                         Sets.newHashSet(partitionId), null).withPriority(priority).withYarnQueue(yarnQueue)
@@ -490,14 +493,22 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil
         return JobInfoResponse.of(jobIds, JobTypeEnum.SUB_PARTITION_BUILD.toString());
     }
 
-    private void checkConcurrentSubmit(int partitionSize) {
-        int runningJobLimit = getConfig().getMaxConcurrentJobLimit();
+    private void checkConcurrentSubmit(int partitionSize, String project) {
+        int runningJobLimit = getMaxConcurrentJobLimitByProject(getConfig(), project);
         int submitJobLimit = runningJobLimit * 5;
         if (partitionSize > submitJobLimit) {
             throw new KylinException(JOB_CONCURRENT_SUBMIT_LIMIT, submitJobLimit);
         }
     }
 
+    public int getMaxConcurrentJobLimitByProject(KylinConfig config, String project) {
+        ProjectInstance prjInstance = NProjectManager.getInstance(config).getProject(project);
+        if (Strings.isNullOrEmpty(project) || prjInstance == null) {
+            return config.getMaxConcurrentJobLimit();
+        }
+        return prjInstance.getConfig().getMaxConcurrentJobLimit();
+    }
+
     @Override
     @Transaction(project = 0)
     public void refreshSegments(String project, String table, String refreshStart, String refreshEnd,
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
index 3dad56193f..6c9ea32d8b 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
@@ -39,6 +39,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
@@ -125,6 +126,7 @@ import org.mockito.Mockito;
 import org.springframework.test.util.ReflectionTestUtils;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import lombok.val;
@@ -1647,4 +1649,48 @@ public class ModelServiceBuildTest extends SourceTestCase {
         Assert.assertThrows(KylinException.class, () -> DateFormat.proposeDateFormat("not_exits"));
     }
 
+    @Test
+    public void testGetMaxConcurrentJobLimitByProject() {
+        String project = getProject();
+        val modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
+        val segmentId = "ff839b0b-2c23-4420-b332-0df70e36c343";
+        val buildPartitions = Lists.<String[]> newArrayList();
+        buildPartitions.add(new String[] { "ASIA" });
+        buildPartitions.add(new String[] { "EUROPE" });
+        buildPartitions.add(new String[] { "MIDDLE EAST" });
+        buildPartitions.add(new String[] { "AMERICA" });
+        buildPartitions.add(new String[] { "MOROCCO" });
+        buildPartitions.add(new String[] { "INDONESIA" });
+
+        overwriteSystemProp("kylin.job.max-concurrent-jobs", "1");
+        Assert.assertEquals(1,
+                modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), project));
+        try {
+            modelBuildService.buildSegmentPartitionByValue(getProject(), modelId, segmentId, buildPartitions, true,
+                    false, 0, null, null);
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof KylinException);
+            Assert.assertEquals(JOB_CONCURRENT_SUBMIT_LIMIT.getMsg(5), e.getMessage());
+            Assert.assertEquals(0, getRunningExecutables(getProject(), modelId).size());
+        }
+
+        val segmentId2 = "d2edf0c5-5eb2-4968-9ad5-09efbf659324";
+        Map<String, String> testOverrideP = Maps.newLinkedHashMap();
+        testOverrideP.put("kylin.job.max-concurrent-jobs", "2");
+        projectService.updateProjectConfig(project, testOverrideP);
+        Assert.assertEquals(2,
+                modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), project));
+        try {
+            modelBuildService.buildSegmentPartitionByValue(getProject(), modelId, segmentId2, buildPartitions, true,
+                    false, 0, null, null);
+        } catch (Exception e) {
+            Assert.fail();
+        }
+        Assert.assertEquals(6, getRunningExecutables(getProject(), modelId).size());
+
+        Assert.assertEquals(1,
+                modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), "xxxxx"));
+    }
+
 }