You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/17 22:31:22 UTC

[incubator-pinot] branch adding_task_scheduler created (now 2907200)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch adding_task_scheduler
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 2907200  Adding task scheduler for pinot tasks in cron expression

This branch includes the following new commits:

     new 2907200  Adding task scheduler for pinot tasks in cron expression

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding task scheduler for pinot tasks in cron expression

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding_task_scheduler
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 29072007dd1ca2d5ef379cd5290bd90363f48f9b
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sun Jan 17 14:30:55 2021 -0800

    Adding task scheduler for pinot tasks in cron expression
---
 pinot-controller/pom.xml                           |   4 +
 .../api/resources/PinotTaskRestletResource.java    |  60 +++++
 .../helix/core/minion/CronJobScheduleJob.java      |  54 ++++
 .../helix/core/minion/PinotTaskManager.java        | 275 +++++++++++++++++++++
 .../core/minion/TableTaskSchedulerUpdater.java     |  44 ++++
 pom.xml                                            |   6 +
 6 files changed, 443 insertions(+)

diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index 1bd4890..9746a0e 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -204,6 +204,10 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.quartz-scheduler</groupId>
+      <artifactId>quartz</artifactId>
+    </dependency>
   </dependencies>
   <build>
     <resources>
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index aa3235e..3e021c3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -22,6 +22,8 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,6 +40,7 @@ import org.apache.helix.task.TaskState;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.quartz.SchedulerException;
 
 
 /**
@@ -167,6 +170,63 @@ public class PinotTaskRestletResource {
     return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
   }
 
+  @GET
+  @Path("/tasks/cron/schedules")
+  @ApiOperation("Fetch cron tasks schedule")
+  public Map<String, Map<String, String>> getCronSchedules(
+      @ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
+      @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) {
+    Map<String, Map<String, String>> tableTaskTypeToCronExpressionMap = _pinotTaskManager.getTableTaskTypeToCronExpressionMap();
+    Map<String, Map<String, String>> results = new HashMap<>();
+    for (String table : tableTaskTypeToCronExpressionMap.keySet()) {
+      if (tableName != null && !table.equalsIgnoreCase(table)) {
+        continue;
+      }
+      Map<String, String> taskTypeToCronExpressionMap = tableTaskTypeToCronExpressionMap.get(table);
+      for (String task : taskTypeToCronExpressionMap.keySet()) {
+        if (taskType != null && !task.equalsIgnoreCase(taskType)) {
+          continue;
+        }
+        if (!results.containsKey(table)) {
+          results.put(table, new HashMap<>());
+        }
+        results.get(table).put(task, taskTypeToCronExpressionMap.get(task));
+      }
+    }
+    return results;
+  }
+
+  @GET
+  @Path("/tasks/cron/nextruntimes")
+  @ApiOperation("Fetch cron tasks next runtimes")
+  public Map<String, Map<String, Date>> getCronNextRuntimes(
+      @ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
+      @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) {
+    try {
+      Map<String, Map<String, Date>> tableTaskTypeToNextRuntimeMap =
+          _pinotTaskManager.getTableTaskTypeToNextRuntimeMap();
+      Map<String, Map<String, Date>> results = new HashMap<>();
+      for (String table : tableTaskTypeToNextRuntimeMap.keySet()) {
+        if (tableName != null && !table.equalsIgnoreCase(table)) {
+          continue;
+        }
+        Map<String, Date> taskTypeToNextRuntimeMap = tableTaskTypeToNextRuntimeMap.get(table);
+        for (String task : taskTypeToNextRuntimeMap.keySet()) {
+          if (taskType != null && !task.equalsIgnoreCase(taskType)) {
+            continue;
+          }
+          if (!results.containsKey(table)) {
+            results.put(table, new HashMap<>());
+          }
+          results.get(table).put(task, taskTypeToNextRuntimeMap.get(task));
+        }
+      }
+      return results;
+    } catch (SchedulerException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @POST
   @Path("/tasks/schedule")
   @ApiOperation("Schedule tasks and return a map from task type to task name scheduled")
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
new file mode 100644
index 0000000..e5653f4
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.apache.pinot.controller.LeadControllerManager;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CronJobScheduleJob implements Job {
+  private static final Logger LOGGER = LoggerFactory.getLogger(CronJobScheduleJob.class);
+
+  public CronJobScheduleJob() {
+  }
+
+  @Override
+  public void execute(JobExecutionContext jobExecutionContext)
+      throws JobExecutionException {
+    PinotTaskManager pinotTaskManager = (PinotTaskManager) jobExecutionContext.getJobDetail().getJobDataMap()
+        .get(PinotTaskManager.PINOT_TASK_MANAGER_KEY);
+    LeadControllerManager leadControllerManager =
+        (LeadControllerManager) jobExecutionContext.getJobDetail().getJobDataMap()
+            .get(PinotTaskManager.LEAD_CONTROLLER_MANAGER_KEY);
+    String table = jobExecutionContext.getJobDetail().getKey().getName();
+    String taskType = jobExecutionContext.getJobDetail().getKey().getGroup();
+    if (leadControllerManager.isLeaderForTable(table)) {
+      LOGGER.info("Execute CronJob: table - {}, task - {} at {}", table, taskType, jobExecutionContext.getFireTime());
+      pinotTaskManager.scheduleTask(taskType, table);
+      LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
+          jobExecutionContext.getNextFireTime());
+    } else {
+      LOGGER.info("Not Lead, skip processing CronJob: table - {}, task - {}", table, taskType);
+    }
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index fe97326..18906dc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -21,10 +21,13 @@ package org.apache.pinot.controller.helix.core.minion;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -36,6 +39,18 @@ import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegi
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.JobBuilder;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.TriggerKey;
+import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,9 +64,21 @@ import org.slf4j.LoggerFactory;
 public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManager.class);
 
+  public final static String PINOT_TASK_MANAGER_KEY = "PinotTaskManager";
+  public final static String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
+  private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
+  private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+
+  private final static String SCHEDULE_KEY = "schedule";
+  private final static String TABLE_TASK_TYPE_SPLIT = "\t\t";
+
   private final PinotHelixTaskResourceManager _helixTaskResourceManager;
   private final ClusterInfoAccessor _clusterInfoAccessor;
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
+  private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>();
+  private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>();
+
+  private Scheduler _scheduledExecutorService = null;
 
   public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
       PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
@@ -62,6 +89,254 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoAccessor = new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
+    try {
+      _scheduledExecutorService = new StdSchedulerFactory().getScheduler();
+      _scheduledExecutorService.start();
+      LOGGER.info("Subscribe to tables change under PropertyStore path: {}", TABLE_CONFIG_PARENT_PATH);
+      _pinotHelixResourceManager.getPropertyStore()
+          .subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, (parentPath, currentChilds) -> {
+            for (String tableWithType : currentChilds) {
+              subscribeTableConfigChanges(tableWithType);
+            }
+            Set<String> tableToDelete = new HashSet(_tableTaskSchedulerUpdaterMap.keySet());
+            tableToDelete.removeAll(currentChilds);
+            if (!tableToDelete.isEmpty()) {
+              LOGGER.info("Found tables to delete: {}", tableToDelete);
+              for (String tableWithType : tableToDelete) {
+                cleanUpCronTaskSchedulerForTable(tableWithType);
+              }
+            }
+          });
+      for (String tableWithType : helixResourceManager.getAllTables()) {
+        subscribeTableConfigChanges(tableWithType);
+      }
+    } catch (SchedulerException e) {
+      LOGGER.error("Unable to create a scheduler.", e);
+    }
+  }
+
+  private String getPropertyStorePathForTable(String tableWithType) {
+    return TABLE_CONFIG_PATH_PREFIX + tableWithType;
+  }
+
+  protected synchronized void cleanupCronTaskScheduler() {
+    try {
+      _scheduledExecutorService.clear();
+    } catch (SchedulerException e) {
+      LOGGER.error("Failed to clear all tasks in scheduler", e);
+    }
+  }
+
+  public synchronized void cleanUpCronTaskSchedulerForTable(String tableWithType) {
+    LOGGER.info("Cleaning up task in scheduler for table {}", tableWithType);
+    TableTaskSchedulerUpdater tableTaskSchedulerUpdater = _tableTaskSchedulerUpdaterMap.get(tableWithType);
+    _pinotHelixResourceManager.getPropertyStore()
+        .unsubscribeDataChanges(getPropertyStorePathForTable(tableWithType), tableTaskSchedulerUpdater);
+    for (String taskType : _tableTaskTypeToCronExpressionMap.get(tableWithType).keySet()) {
+      try {
+        _scheduledExecutorService.deleteJob(JobKey.jobKey(tableWithType, taskType));
+      } catch (SchedulerException e) {
+        LOGGER.error("Failed to delete job for table {}, task type {}", tableWithType, taskType, e);
+      }
+    }
+    _tableTaskSchedulerUpdaterMap.remove(tableWithType);
+  }
+
+  public synchronized void subscribeTableConfigChanges(String tableWithType) {
+    LOGGER.info("Trying to subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+    if (_tableTaskSchedulerUpdaterMap.containsKey(tableWithType)) {
+      LOGGER.info("Skipping subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+      return;
+    }
+    TableTaskSchedulerUpdater tableTaskSchedulerUpdater = new TableTaskSchedulerUpdater(tableWithType, this);
+    LOGGER.info("subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+    _pinotHelixResourceManager.getPropertyStore()
+        .subscribeDataChanges(getPropertyStorePathForTable(tableWithType), tableTaskSchedulerUpdater);
+    _tableTaskSchedulerUpdaterMap.put(tableWithType, tableTaskSchedulerUpdater);
+    LOGGER.info("Finished subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+    LOGGER.info("Trying to update task schedule for table: {}", tableWithType);
+    try {
+      if (_scheduledExecutorService != null) {
+        updateCronTaskScheduler(tableWithType);
+      }
+    } catch (Exception e) {
+      LOGGER.error("Got exception during updateCronTaskScheduler for {}", tableWithType, e);
+    }
+  }
+
+  public synchronized void updateCronTaskScheduler(String tableWithType) {
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableWithType);
+    TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+    if (taskConfig == null) {
+      LOGGER.info("taskConfig is null, trying to remove all the tasks for table {} if any", tableWithType);
+      removeAllTasksFromCronExpressions(tableWithType);
+      return;
+    }
+    Map<String, Map<String, String>> taskTypeConfigsMap = taskConfig.getTaskTypeConfigsMap();
+    if (taskTypeConfigsMap == null) {
+      LOGGER.info("taskTypeConfigsMap is null, trying to remove all the tasks for table {} if any", tableWithType);
+      removeAllTasksFromCronExpressions(tableWithType);
+      return;
+    }
+    Map<String, String> taskToCronExpressionMap = getTaskToCronExpressionMap(taskTypeConfigsMap);
+    LOGGER.info("Got taskToCronExpressionMap {} ", taskToCronExpressionMap);
+    updateCronTaskScheduler(tableWithType, taskToCronExpressionMap);
+  }
+
+  private void updateCronTaskScheduler(String tableWithType, Map<String, String> taskToCronExpressionMap) {
+    if (_tableTaskTypeToCronExpressionMap.containsKey(tableWithType)) {
+      Map<String, String> existingScheduledTasks = _tableTaskTypeToCronExpressionMap.get(tableWithType);
+      for (String existingTaskType : existingScheduledTasks.keySet()) {
+        // Task should be removed
+        if (!taskToCronExpressionMap.containsKey(existingTaskType)) {
+          try {
+            _scheduledExecutorService.deleteJob(JobKey.jobKey(tableWithType, existingTaskType));
+          } catch (SchedulerException e) {
+            LOGGER.error("Failed to delete scheduled job for table {}, task type {}", tableWithType,
+                existingScheduledTasks, e);
+          }
+          continue;
+        }
+        String existingCronExpression = existingScheduledTasks.get(existingTaskType);
+        String newCronExpression = taskToCronExpressionMap.get(existingTaskType);
+        // Schedule new job
+        if (existingCronExpression == null) {
+          try {
+            scheduleJob(tableWithType, existingTaskType, newCronExpression);
+          } catch (SchedulerException e) {
+            LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", tableWithType,
+                existingTaskType, newCronExpression, e);
+          }
+          continue;
+        }
+        // Update existing task with new cron expr
+        if (!existingCronExpression.equalsIgnoreCase(newCronExpression)) {
+          try {
+            TriggerKey triggerKey = TriggerKey.triggerKey(tableWithType, existingTaskType);
+            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey)
+                .withSchedule(CronScheduleBuilder.cronSchedule(newCronExpression)).build();
+            _scheduledExecutorService.rescheduleJob(triggerKey, trigger);
+          } catch (SchedulerException e) {
+            LOGGER.error("Failed to delete scheduled job for table {}, task type {}", tableWithType,
+                existingScheduledTasks, e);
+          }
+          continue;
+        }
+      }
+    } else {
+      for (String taskType : taskToCronExpressionMap.keySet()) {
+        // Schedule new job
+        String cronExpr = taskToCronExpressionMap.get(taskType);
+        try {
+          scheduleJob(tableWithType, taskType, cronExpr);
+        } catch (SchedulerException e) {
+          LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", tableWithType, taskType,
+              cronExpr, e);
+        }
+      }
+    }
+    _tableTaskTypeToCronExpressionMap.put(tableWithType, taskToCronExpressionMap);
+  }
+
+  private void scheduleJob(String tableWithType, String taskType, String cronExprStr)
+      throws SchedulerException {
+    boolean exists = false;
+    try {
+      exists = _scheduledExecutorService.checkExists(JobKey.jobKey(tableWithType, taskType));
+    } catch (SchedulerException e) {
+      LOGGER.error("Failed to check job existence for job key - table: {}, task: {} ", tableWithType, taskType, e);
+    }
+    if (!exists) {
+      LOGGER
+          .info("Trying to put cron expression: {} for table {}, task type: {}", cronExprStr, tableWithType, taskType);
+      Trigger trigger = TriggerBuilder.newTrigger().withIdentity(TriggerKey.triggerKey(tableWithType, taskType))
+          .withSchedule(CronScheduleBuilder.cronSchedule(cronExprStr)).build();
+      JobDataMap jobDataMap = new JobDataMap();
+      jobDataMap.put(PINOT_TASK_MANAGER_KEY, this);
+      jobDataMap.put(LEAD_CONTROLLER_MANAGER_KEY, this._leadControllerManager);
+      JobDetail jobDetail =
+          JobBuilder.newJob(CronJobScheduleJob.class).withIdentity(tableWithType, taskType).setJobData(jobDataMap)
+              .build();
+      try {
+        _scheduledExecutorService.scheduleJob(jobDetail, trigger);
+      } catch (Exception e) {
+        LOGGER.error("Failed to parse Cron expression - " + cronExprStr, e);
+        throw e;
+      }
+      Date nextRuntime = trigger.getNextFireTime();
+      LOGGER
+          .info("Scheduled task for table: {}, task type: {}, next runtime: {}", tableWithType, taskType, nextRuntime);
+    }
+  }
+
+  private Map<String, String> getTaskToCronExpressionMap(Map<String, Map<String, String>> taskTypeConfigsMap) {
+    Map<String, String> taskToCronExpressionMap = new HashMap<>();
+    for (String taskType : taskTypeConfigsMap.keySet()) {
+      Map<String, String> taskTypeConfig = taskTypeConfigsMap.get(taskType);
+      if (taskTypeConfig == null || !taskTypeConfig.containsKey(SCHEDULE_KEY)) {
+        continue;
+      }
+      String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
+      if (cronExprStr == null) {
+        continue;
+      }
+      taskToCronExpressionMap.put(taskType, cronExprStr);
+    }
+    return taskToCronExpressionMap;
+  }
+
+  private void removeAllTasksFromCronExpressions(String tableWithType) {
+    Map<String, String> toRemove = _tableTaskTypeToCronExpressionMap.remove(tableWithType);
+    if (toRemove == null) {
+      return;
+    }
+    for (String taskType : toRemove.keySet()) {
+      try {
+        _scheduledExecutorService.deleteJob(JobKey.jobKey(tableWithType, taskType));
+      } catch (SchedulerException e) {
+        LOGGER.error("Got exception when deleting the scheduled job - table {}, task type {}", tableWithType, taskType,
+            e);
+      }
+    }
+  }
+
+  public Map<String, Map<String, Date>> getTableTaskTypeToNextRuntimeMap()
+      throws SchedulerException {
+    Map<String, Map<String, Date>> tableTaskTypeToNextRuntimeMap = new HashMap<>();
+    for (String table : _tableTaskTypeToCronExpressionMap.keySet()) {
+      for (String taskType : _tableTaskTypeToCronExpressionMap.get(table).keySet()) {
+        TriggerKey triggerKey = TriggerKey.triggerKey(table, taskType);
+        LOGGER.info("Trying to get trigger for trigger key: {}", triggerKey);
+        Trigger trigger = _scheduledExecutorService.getTrigger(triggerKey);
+        if (trigger == null || trigger.getJobKey() == null) {
+          continue;
+        }
+        Date nextFireTime = trigger.getNextFireTime();
+        if (!tableTaskTypeToNextRuntimeMap.containsKey(table)) {
+          tableTaskTypeToNextRuntimeMap.put(table, new HashMap<>());
+        }
+        tableTaskTypeToNextRuntimeMap.get(table).put(taskType, nextFireTime);
+      }
+    }
+    LOGGER.info("getTableTaskTypeToNextRuntimeMap: {}", tableTaskTypeToNextRuntimeMap);
+    return tableTaskTypeToNextRuntimeMap;
+  }
+
+  public Map<String, Map<String, String>> getTableTaskTypeToCronExpressionMap() {
+    LOGGER.info("getTableTaskTypeToCronExpressionMap: {}", _tableTaskTypeToCronExpressionMap);
+    return _tableTaskTypeToCronExpressionMap;
+  }
+
+  public static String getTableTaskType(String tableWithType, String taskType) {
+    return tableWithType + TABLE_TASK_TYPE_SPLIT + taskType;
+  }
+
+  public static String getTableNameFromTableTaskType(String tableTaskType) {
+    return tableTaskType.split(TABLE_TASK_TYPE_SPLIT, 2)[0];
+  }
+
+  public static String getTaskTypeFromTableTaskType(String tableTaskType) {
+    return tableTaskType.split(TABLE_TASK_TYPE_SPLIT, 2)[1];
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TableTaskSchedulerUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TableTaskSchedulerUpdater.java
new file mode 100644
index 0000000..3381242
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TableTaskSchedulerUpdater.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.I0Itec.zkclient.IZkDataListener;
+
+
+public class TableTaskSchedulerUpdater implements IZkDataListener {
+  private final String _tableWithType;
+  private final PinotTaskManager _pinotTaskManager;
+
+  public TableTaskSchedulerUpdater(String tableWithType, PinotTaskManager pinotTaskManager) {
+    _tableWithType = tableWithType;
+    _pinotTaskManager = pinotTaskManager;
+  }
+
+  @Override
+  public void handleDataChange(String dataPath, Object data)
+      throws Exception {
+    _pinotTaskManager.updateCronTaskScheduler(_tableWithType);
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath)
+      throws Exception {
+    _pinotTaskManager.cleanUpCronTaskSchedulerForTable(_tableWithType);
+  }
+}
diff --git a/pom.xml b/pom.xml
index 7711a57..f9f682c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
     <scala.version>2.11.11</scala.version>
     <antlr.version>4.6</antlr.version>
     <jsonpath.version>2.4.0</jsonpath.version>
+    <quartz.version>2.3.0</quartz.version>
     <calcite.version>1.19.0</calcite.version>
     <lucene.version>8.2.0</lucene.version>
     <reflections.version>0.9.11</reflections.version>
@@ -510,6 +511,11 @@
         <version>2.0.1</version>
       </dependency>
       <dependency>
+        <groupId>org.quartz-scheduler</groupId>
+        <artifactId>quartz</artifactId>
+        <version>${quartz.version}</version>
+      </dependency>
+      <dependency>
         <groupId>javax.validation</groupId>
         <artifactId>validation-api</artifactId>
         <version>2.0.1.Final</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org