You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/09/12 17:57:33 UTC

[pinot] branch master updated: skip late cron job with max allowed delay (#9372)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 45cc5dd0e1 skip late cron job with max allowed delay (#9372)
45cc5dd0e1 is described below

commit 45cc5dd0e155578e9c3acd3efeb61cec239838cf
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Mon Sep 12 10:57:27 2022 -0700

    skip late cron job with max allowed delay (#9372)
---
 .../configs/controller.yml                         |  6 ++++
 .../pinot/common/metrics/ControllerMeter.java      |  1 +
 .../apache/pinot/controller/ControllerConf.java    | 11 ++++++
 .../helix/core/minion/CronJobScheduleJob.java      | 21 ++++++++++-
 .../helix/core/minion/PinotTaskManager.java        |  9 ++++-
 .../core/minion/PinotTaskManagerStatelessTest.java | 42 ++++++++++++++++++++++
 6 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
index 4c75003223..c44dbf3937 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
@@ -94,6 +94,12 @@ rules:
   labels:
     table: "$1"
     taskType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.(\\w+)\\.(\\w+).cronSchedulerSkipped\"><>(\\w+)"
+  name: "pinot_controller_cronSchedulerSkipped_$3"
+  cache: true
+  labels:
+    table: "$1"
+    taskType: "$2"
 - pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.(\\w+)\\.(\\w+).cronSchedulerJobExecutionTimeMs\"><>(\\w+)"
   name: "pinot_controller_cronSchedulerJobExecutionTimeMs_$3"
   cache: true
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index e4c634cf30..edd70ee642 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -55,6 +55,7 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   NUMBER_END_REPLACE_FAILURE("NumEndReplaceFailure", false),
   NUMBER_REVERT_REPLACE_FAILURE("NumRevertReplaceFailure", false),
   CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
+  CRON_SCHEDULER_JOB_SKIPPED("cronSchedulerJobSkipped", false),
   LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false),
   NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index ff7a987d57..e03a7895bb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -123,6 +123,9 @@ public class ControllerConf extends PinotConfiguration {
     @Deprecated
     public static final String DEPRECATED_TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
     public static final String TASK_MANAGER_FREQUENCY_PERIOD = "controller.task.frequencyPeriod";
+    public static final String TASK_MANAGER_SKIP_LATE_CRON_SCHEDULE = "controller.task.skipLateCronSchedule";
+    public static final String TASK_MANAGER_MAX_CRON_SCHEDULE_DELAY_IN_SECONDS =
+        "controller.task.maxCronScheduleDelayInSeconds";
     // Deprecated as of 0.8.0
     @Deprecated
     public static final String DEPRECATED_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS =
@@ -654,6 +657,14 @@ public class ControllerConf extends PinotConfiguration {
     setProperty(DELETED_SEGMENTS_RETENTION_IN_DAYS, retentionInDays);
   }
 
+  public boolean isSkipLateCronSchedule() {
+    return getProperty(ControllerPeriodicTasksConf.TASK_MANAGER_SKIP_LATE_CRON_SCHEDULE, false);
+  }
+
+  public int getMaxCronScheduleDelayInSeconds() {
+    return getProperty(ControllerPeriodicTasksConf.TASK_MANAGER_MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, 600);
+  }
+
   public int getTaskManagerFrequencyInSeconds() {
     return Optional.ofNullable(getProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_PERIOD))
         .map(period -> (int) convertPeriodToSeconds(period)).orElseGet(
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
index 1a250dcd41..8c0433854f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core.minion;
 
+import java.util.Date;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerTimer;
@@ -43,13 +44,27 @@ public class CronJobScheduleJob implements Job {
     LeadControllerManager leadControllerManager =
         (LeadControllerManager) jobExecutionContext.getJobDetail().getJobDataMap()
             .get(PinotTaskManager.LEAD_CONTROLLER_MANAGER_KEY);
+    Boolean skipLateCronSchedule =
+        (Boolean) jobExecutionContext.getJobDetail().getJobDataMap().get(PinotTaskManager.SKIP_LATE_CRON_SCHEDULE);
+    int maxDelayInSeconds = (Integer) jobExecutionContext.getJobDetail().getJobDataMap()
+        .get(PinotTaskManager.MAX_CRON_SCHEDULE_DELAY_IN_SECONDS);
     String table = jobExecutionContext.getJobDetail().getKey().getName();
     String taskType = jobExecutionContext.getJobDetail().getKey().getGroup();
     pinotTaskManager.getControllerMetrics().addMeteredTableValue(PinotTaskManager.getCronJobName(table, taskType),
         ControllerMeter.CRON_SCHEDULER_JOB_TRIGGERED, 1L);
     if (leadControllerManager.isLeaderForTable(table)) {
+      Date fireTime = jobExecutionContext.getFireTime();
+      LOGGER.info("Execute CronJob: table - {}, task - {} at {}", table, taskType, fireTime);
+      Date scheduledFireTime = jobExecutionContext.getScheduledFireTime();
+      if (skipLateCronSchedule && isCronScheduleLate(fireTime, scheduledFireTime, maxDelayInSeconds)) {
+        LOGGER.warn(
+            "Skip late CronJob: table - {}, task - {} fired at {} but expected at {} with allowed delayInSeconds: {}",
+            table, taskType, fireTime, scheduledFireTime, maxDelayInSeconds);
+        pinotTaskManager.getControllerMetrics().addMeteredTableValue(PinotTaskManager.getCronJobName(table, taskType),
+            ControllerMeter.CRON_SCHEDULER_JOB_SKIPPED, 1L);
+        return;
+      }
       long jobStartTime = System.currentTimeMillis();
-      LOGGER.info("Execute CronJob: table - {}, task - {} at {}", table, taskType, jobExecutionContext.getFireTime());
       pinotTaskManager.scheduleTask(taskType, table);
       LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
           jobExecutionContext.getNextFireTime());
@@ -60,4 +75,8 @@ public class CronJobScheduleJob implements Job {
       LOGGER.info("Not Lead, skip processing CronJob: table - {}, task - {}", table, taskType);
     }
   }
+
+  private boolean isCronScheduleLate(Date fireTime, Date scheduledFireTime, long maxDelayInSeconds) {
+    return fireTime.getTime() - scheduledFireTime.getTime() > maxDelayInSeconds * 1000;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index d7e5d5afc7..b6bc4b6bb0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -83,6 +83,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManager.class);
 
   public final static String PINOT_TASK_MANAGER_KEY = "PinotTaskManager";
+  public final static String SKIP_LATE_CRON_SCHEDULE = "SkipLateCronSchedule";
+  public final static String MAX_CRON_SCHEDULE_DELAY_IN_SECONDS = "MaxCronScheduleDelayInSeconds";
   public final static String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
   public final static String SCHEDULE_KEY = "schedule";
 
@@ -96,6 +98,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
 
   // For cron-based scheduling
   private final Scheduler _scheduler;
+  private final boolean _skipLateCronSchedule;
+  private final int _maxCronScheduleDelayInSeconds;
   private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>();
   private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>();
 
@@ -120,7 +124,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
         new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf, controllerMetrics,
             leadControllerManager);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
-
+    _skipLateCronSchedule = controllerConf.isSkipLateCronSchedule();
+    _maxCronScheduleDelayInSeconds = controllerConf.getMaxCronScheduleDelayInSeconds();
     if (controllerConf.isPinotTaskManagerSchedulerEnabled()) {
       try {
         _scheduler = new StdSchedulerFactory().getScheduler();
@@ -412,6 +417,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
       JobDataMap jobDataMap = new JobDataMap();
       jobDataMap.put(PINOT_TASK_MANAGER_KEY, this);
       jobDataMap.put(LEAD_CONTROLLER_MANAGER_KEY, _leadControllerManager);
+      jobDataMap.put(SKIP_LATE_CRON_SCHEDULE, _skipLateCronSchedule);
+      jobDataMap.put(MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, _maxCronScheduleDelayInSeconds);
       JobDetail jobDetail =
           JobBuilder.newJob(CronJobScheduleJob.class).withIdentity(tableWithType, taskType).setJobData(jobDataMap)
               .build();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
index cf41d1114e..6b6e0406ef 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
@@ -72,6 +72,41 @@ public class PinotTaskManagerStatelessTest extends ControllerTest {
     stopController();
   }
 
+  @Test
+  public void testSkipLateCronSchedule()
+      throws Exception {
+    Map<String, Object> properties = getDefaultControllerConfiguration();
+    properties.put(ControllerConf.ControllerPeriodicTasksConf.PINOT_TASK_MANAGER_SCHEDULER_ENABLED, true);
+    properties.put(ControllerConf.ControllerPeriodicTasksConf.TASK_MANAGER_SKIP_LATE_CRON_SCHEDULE, "true");
+    properties.put(ControllerConf.ControllerPeriodicTasksConf.TASK_MANAGER_MAX_CRON_SCHEDULE_DELAY_IN_SECONDS, "10");
+    startController(properties);
+    addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addSingleValueDimension("myMap", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("myMapStr", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("complexMapStr", FieldSpec.DataType.STRING).build();
+    addSchema(schema);
+    PinotTaskManager taskManager = _controllerStarter.getTaskManager();
+    Scheduler scheduler = taskManager.getScheduler();
+    assertNotNull(scheduler);
+
+    // Add Table with one task.
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
+        new TableTaskConfig(
+            ImmutableMap.of("SegmentGenerationAndPushTask", ImmutableMap.of("schedule", "0 * * ? * * *")))).build();
+    addTableConfig(tableConfig);
+    waitForJobGroupNames(_controllerStarter.getTaskManager(),
+        jgn -> jgn.size() == 1 && jgn.contains(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE),
+        "JobGroupNames should have SegmentGenerationAndPushTask only");
+    validateJob(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE, "0 */10 * ? * * *", true, 10);
+
+    dropOfflineTable(RAW_TABLE_NAME);
+    waitForJobGroupNames(_controllerStarter.getTaskManager(), List::isEmpty, "JobGroupNames should be empty");
+    stopFakeInstances();
+    stopController();
+  }
+
   @Test
   public void testPinotTaskManagerSchedulerWithUpdate()
       throws Exception {
@@ -219,6 +254,11 @@ public class PinotTaskManagerStatelessTest extends ControllerTest {
 
   private void validateJob(String taskType, String cronExpression)
       throws Exception {
+    validateJob(taskType, cronExpression, false, 600);
+  }
+
+  private void validateJob(String taskType, String cronExpression, boolean skipLateCronSchedule, int maxDelayInSeconds)
+      throws Exception {
     PinotTaskManager taskManager = _controllerStarter.getTaskManager();
     Scheduler scheduler = taskManager.getScheduler();
     assert scheduler != null;
@@ -231,6 +271,8 @@ public class PinotTaskManagerStatelessTest extends ControllerTest {
     assertEquals(jobDetail.getKey().getGroup(), taskType);
     assertSame(jobDetail.getJobDataMap().get("PinotTaskManager"), taskManager);
     assertSame(jobDetail.getJobDataMap().get("LeadControllerManager"), _controllerStarter.getLeadControllerManager());
+    assertEquals(jobDetail.getJobDataMap().get("SkipLateCronSchedule"), skipLateCronSchedule);
+    assertEquals(jobDetail.getJobDataMap().get("MaxCronScheduleDelayInSeconds"), maxDelayInSeconds);
     // jobDetail and jobTrigger are not added atomically by the scheduler,
     // the jobDetail is added to an internal map firstly, and jobTrigger
     // is added to another internal map afterwards, so we check for the existence


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org