You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/09/12 17:57:33 UTC
[pinot] branch master updated: skip late cron job with max allowed delay (#9372)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 45cc5dd0e1 skip late cron job with max allowed delay (#9372)
45cc5dd0e1 is described below
commit 45cc5dd0e155578e9c3acd3efeb61cec239838cf
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Mon Sep 12 10:57:27 2022 -0700
skip late cron job with max allowed delay (#9372)
---
.../configs/controller.yml | 6 ++++
.../pinot/common/metrics/ControllerMeter.java | 1 +
.../apache/pinot/controller/ControllerConf.java | 11 ++++++
.../helix/core/minion/CronJobScheduleJob.java | 21 ++++++++++-
.../helix/core/minion/PinotTaskManager.java | 9 ++++-
.../core/minion/PinotTaskManagerStatelessTest.java | 42 ++++++++++++++++++++++
6 files changed, 88 insertions(+), 2 deletions(-)
diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
index 4c75003223..c44dbf3937 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
@@ -94,6 +94,12 @@ rules:
labels:
table: "$1"
taskType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.(\\w+)\\.(\\w+).cronSchedulerSkipped\"><>(\\w+)"
+ name: "pinot_controller_cronSchedulerSkipped_$3"
+ cache: true
+ labels:
+ table: "$1"
+ taskType: "$2"
- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.(\\w+)\\.(\\w+).cronSchedulerJobExecutionTimeMs\"><>(\\w+)"
name: "pinot_controller_cronSchedulerJobExecutionTimeMs_$3"
cache: true
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index e4c634cf30..edd70ee642 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -55,6 +55,7 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
NUMBER_END_REPLACE_FAILURE("NumEndReplaceFailure", false),
NUMBER_REVERT_REPLACE_FAILURE("NumRevertReplaceFailure", false),
CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
+ CRON_SCHEDULER_JOB_SKIPPED("cronSchedulerJobSkipped", false),
LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false),
NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index ff7a987d57..e03a7895bb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -123,6 +123,9 @@ public class ControllerConf extends PinotConfiguration {
@Deprecated
public static final String DEPRECATED_TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
public static final String TASK_MANAGER_FREQUENCY_PERIOD = "controller.task.frequencyPeriod";
+ public static final String TASK_MANAGER_SKIP_LATE_CRON_SCHEDULE = "controller.task.skipLateCronSchedule";
+ public static final String TASK_MANAGER_MAX_CRON_SCHEDULE_DELAY_IN_SECONDS =
+ "controller.task.maxCronScheduleDelayInSeconds";
// Deprecated as of 0.8.0
@Deprecated
public static final String DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS =
@@ -654,6 +657,14 @@ public class ControllerConf extends PinotConfiguration {
setProperty(DELETED_SEGMENTS_RETENTION_IN_DAYS, retentionInDays);
}
+ public boolean isSkipLateCronSchedule() {
+ return getProperty(ControllerPeriodicTasksConf.TASK_MANAGER_SKIP_LATE_CRON_SCHEDULE, false);
+ }
+
+ public int getMaxCronScheduleDelayInSeconds() {
+ return getProperty(ControllerPeriodicTasksConf.TASK_MANAGER_MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, 600);
+ }
+
public int getTaskManagerFrequencyInSeconds() {
return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_PERIOD))
.map(period -> (int) convertPeriodToSeconds(period)).orElseGet(
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
index 1a250dcd41..8c0433854f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.minion;
+import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerTimer;
@@ -43,13 +44,27 @@ public class CronJobScheduleJob implements Job {
LeadControllerManager leadControllerManager =
(LeadControllerManager) jobExecutionContext.getJobDetail().getJobDataMap()
.get(PinotTaskManager.LEAD_CONTROLLER_MANAGER_KEY);
+ Boolean skipLateCronSchedule =
+ (Boolean) jobExecutionContext.getJobDetail().getJobDataMap().get(PinotTaskManager.SKIP_LATE_CRON_SCHEDULE);
+ int maxDelayInSeconds = (Integer) jobExecutionContext.getJobDetail().getJobDataMap()
+ .get(PinotTaskManager.MAX_CRON_SCHEDULE_DELAY_IN_SECONDS);
String table = jobExecutionContext.getJobDetail().getKey().getName();
String taskType = jobExecutionContext.getJobDetail().getKey().getGroup();
pinotTaskManager.getControllerMetrics().addMeteredTableValue(PinotTaskManager.getCronJobName(table, taskType),
ControllerMeter.CRON_SCHEDULER_JOB_TRIGGERED, 1L);
if (leadControllerManager.isLeaderForTable(table)) {
+ Date fireTime = jobExecutionContext.getFireTime();
+ LOGGER.info("Execute CronJob: table - {}, task - {} at {}", table, taskType, fireTime);
+ Date scheduledFireTime = jobExecutionContext.getScheduledFireTime();
+ if (skipLateCronSchedule && isCronScheduleLate(fireTime, scheduledFireTime, maxDelayInSeconds)) {
+ LOGGER.warn(
+ "Skip late CronJob: table - {}, task - {} fired at {} but expected at {} with allowed delayInSeconds: {}",
+ table, taskType, fireTime, scheduledFireTime, maxDelayInSeconds);
+ pinotTaskManager.getControllerMetrics().addMeteredTableValue(PinotTaskManager.getCronJobName(table, taskType),
+ ControllerMeter.CRON_SCHEDULER_JOB_SKIPPED, 1L);
+ return;
+ }
long jobStartTime = System.currentTimeMillis();
- LOGGER.info("Execute CronJob: table - {}, task - {} at {}", table, taskType, jobExecutionContext.getFireTime());
pinotTaskManager.scheduleTask(taskType, table);
LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
jobExecutionContext.getNextFireTime());
@@ -60,4 +75,8 @@ public class CronJobScheduleJob implements Job {
LOGGER.info("Not Lead, skip processing CronJob: table - {}, task - {}", table, taskType);
}
}
+
+ private boolean isCronScheduleLate(Date fireTime, Date scheduledFireTime, long maxDelayInSeconds) {
+ return fireTime.getTime() - scheduledFireTime.getTime() > maxDelayInSeconds * 1000;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index d7e5d5afc7..b6bc4b6bb0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -83,6 +83,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManager.class);
public final static String PINOT_TASK_MANAGER_KEY = "PinotTaskManager";
+ public final static String SKIP_LATE_CRON_SCHEDULE = "SkipLateCronSchedule";
+ public final static String MAX_CRON_SCHEDULE_DELAY_IN_SECONDS = "MaxCronScheduleDelayInSeconds";
public final static String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
public final static String SCHEDULE_KEY = "schedule";
@@ -96,6 +98,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
// For cron-based scheduling
private final Scheduler _scheduler;
+ private final boolean _skipLateCronSchedule;
+ private final int _maxCronScheduleDelayInSeconds;
private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>();
private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>();
@@ -120,7 +124,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics,
leadControllerManager);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
-
+ _skipLateCronSchedule = controllerConf.isSkipLateCronSchedule();
+ _maxCronScheduleDelayInSeconds = controllerConf.getMaxCronScheduleDelayInSeconds();
if (controllerConf.isPinotTaskManagerSchedulerEnabled()) {
try {
_scheduler = new StdSchedulerFactory().getScheduler();
@@ -412,6 +417,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put(PINOT_TASK_MANAGER_KEY, this);
jobDataMap.put(LEAD_CONTROLLER_MANAGER_KEY, _leadControllerManager);
+ jobDataMap.put(SKIP_LATE_CRON_SCHEDULE, _skipLateCronSchedule);
+ jobDataMap.put(MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, _maxCronScheduleDelayInSeconds);
JobDetail jobDetail =
JobBuilder.newJob(CronJobScheduleJob.class).withIdentity(tableWithType, taskType).setJobData(jobDataMap)
.build();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
index cf41d1114e..6b6e0406ef 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
@@ -72,6 +72,41 @@ public class PinotTaskManagerStatelessTest extends ControllerTest {
stopController();
}
+ @Test
+ public void testSkipLateCronSchedule()
+ throws Exception {
+ Map<String, Object> properties = getDefaultControllerConfiguration();
+ properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
+ properties.put(ControllerConf.ControllerPeriodicTasksConf.TASK_MANAGER_SKIP_LATE_CRON_SCHEDULE, "true");
+ properties.put(ControllerConf.ControllerPeriodicTasksConf.TASK_MANAGER_MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, "10");
+ startController(properties);
+ addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+ .addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("complexMapStr", FieldSpec.DataType.STRING).build();
+ addSchema(schema);
+ PinotTaskManager taskManager = _controllerStarter.getTaskManager();
+ Scheduler scheduler = taskManager.getScheduler();
+ assertNotNull(scheduler);
+
+ // Add Table with one task.
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+ new TableTaskConfig(
+ ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 * * ? * * *")))).build();
+ addTableConfig(tableConfig);
+ waitForJobGroupNames(_controllerStarter.getTaskManager(),
+ jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
+ "JobGroupNames should have SegmentGenerationAndPushTask only");
+ validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *", true, 10);
+
+ dropOfflineTable(RAW_TABLE_NAME);
+ waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty");
+ stopFakeInstances();
+ stopController();
+ }
+
@Test
public void testPinotTaskManagerSchedulerWithUpdate()
throws Exception {
@@ -219,6 +254,11 @@ public class PinotTaskManagerStatelessTest extends ControllerTest {
private void validateJob(String taskType, String cronExpression)
throws Exception {
+ validateJob(taskType, cronExpression, false, 600);
+ }
+
+ private void validateJob(String taskType, String cronExpression, boolean skipLateCronSchedule, int maxDelayInSeconds)
+ throws Exception {
PinotTaskManager taskManager = _controllerStarter.getTaskManager();
Scheduler scheduler = taskManager.getScheduler();
assert scheduler != null;
@@ -231,6 +271,8 @@ public class PinotTaskManagerStatelessTest extends ControllerTest {
assertEquals(jobDetail.getKey().getGroup(), taskType);
assertSame(jobDetail.getJobDataMap().get("PinotTaskManager"), taskManager);
assertSame(jobDetail.getJobDataMap().get("LeadControllerManager"), _controllerStarter.getLeadControllerManager());
+ assertEquals(jobDetail.getJobDataMap().get("SkipLateCronSchedule"), skipLateCronSchedule);
+ assertEquals(jobDetail.getJobDataMap().get("MaxCronScheduleDelayInSeconds"), maxDelayInSeconds);
// jobDetail and jobTrigger are not added atomically by the scheduler,
// the jobDetail is added to an internal map firstly, and jobTrigger
// is added to another internal map afterwards, so we check for the existence
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org