You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/17 22:31:23 UTC

[incubator-pinot] 01/01: Adding task scheduler for pinot tasks in cron expression

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding_task_scheduler
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 29072007dd1ca2d5ef379cd5290bd90363f48f9b
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sun Jan 17 14:30:55 2021 -0800

    Adding task scheduler for pinot tasks in cron expression
---
 pinot-controller/pom.xml                           |   4 +
 .../api/resources/PinotTaskRestletResource.java    |  60 +++++
 .../helix/core/minion/CronJobScheduleJob.java      |  54 ++++
 .../helix/core/minion/PinotTaskManager.java        | 275 +++++++++++++++++++++
 .../core/minion/TableTaskSchedulerUpdater.java     |  44 ++++
 pom.xml                                            |   6 +
 6 files changed, 443 insertions(+)

diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index 1bd4890..9746a0e 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -204,6 +204,10 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.quartz-scheduler</groupId>
+      <artifactId>quartz</artifactId>
+    </dependency>
   </dependencies>
   <build>
     <resources>
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index aa3235e..3e021c3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -22,6 +22,8 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -38,6 +40,7 @@ import org.apache.helix.task.TaskState;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.quartz.SchedulerException;
 
 
 /**
@@ -167,6 +170,63 @@ public class PinotTaskRestletResource {
     return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
   }
 
+  @GET
+  @Path("/tasks/cron/schedules")
+  @ApiOperation("Fetch cron tasks schedule")
+  public Map<String, Map<String, String>> getCronSchedules(
+      @ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
+      @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) {
+    Map<String, Map<String, String>> tableTaskTypeToCronExpressionMap = _pinotTaskManager.getTableTaskTypeToCronExpressionMap();
+    Map<String, Map<String, String>> results = new HashMap<>();
+    for (String table : tableTaskTypeToCronExpressionMap.keySet()) {
+      if (tableName != null && !table.equalsIgnoreCase(table)) {
+        continue;
+      }
+      Map<String, String> taskTypeToCronExpressionMap = tableTaskTypeToCronExpressionMap.get(table);
+      for (String task : taskTypeToCronExpressionMap.keySet()) {
+        if (taskType != null && !task.equalsIgnoreCase(taskType)) {
+          continue;
+        }
+        if (!results.containsKey(table)) {
+          results.put(table, new HashMap<>());
+        }
+        results.get(table).put(task, taskTypeToCronExpressionMap.get(task));
+      }
+    }
+    return results;
+  }
+
+  @GET
+  @Path("/tasks/cron/nextruntimes")
+  @ApiOperation("Fetch cron tasks next runtimes")
+  public Map<String, Map<String, Date>> getCronNextRuntimes(
+      @ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
+      @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) {
+    try {
+      Map<String, Map<String, Date>> tableTaskTypeToNextRuntimeMap =
+          _pinotTaskManager.getTableTaskTypeToNextRuntimeMap();
+      Map<String, Map<String, Date>> results = new HashMap<>();
+      for (String table : tableTaskTypeToNextRuntimeMap.keySet()) {
+        if (tableName != null && !table.equalsIgnoreCase(table)) {
+          continue;
+        }
+        Map<String, Date> taskTypeToNextRuntimeMap = tableTaskTypeToNextRuntimeMap.get(table);
+        for (String task : taskTypeToNextRuntimeMap.keySet()) {
+          if (taskType != null && !task.equalsIgnoreCase(taskType)) {
+            continue;
+          }
+          if (!results.containsKey(table)) {
+            results.put(table, new HashMap<>());
+          }
+          results.get(table).put(task, taskTypeToNextRuntimeMap.get(task));
+        }
+      }
+      return results;
+    } catch (SchedulerException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @POST
   @Path("/tasks/schedule")
   @ApiOperation("Schedule tasks and return a map from task type to task name scheduled")
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
new file mode 100644
index 0000000..e5653f4
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.apache.pinot.controller.LeadControllerManager;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CronJobScheduleJob implements Job {
+  private static final Logger LOGGER = LoggerFactory.getLogger(CronJobScheduleJob.class);
+
+  public CronJobScheduleJob() {
+  }
+
+  @Override
+  public void execute(JobExecutionContext jobExecutionContext)
+      throws JobExecutionException {
+    PinotTaskManager pinotTaskManager = (PinotTaskManager) jobExecutionContext.getJobDetail().getJobDataMap()
+        .get(PinotTaskManager.PINOT_TASK_MANAGER_KEY);
+    LeadControllerManager leadControllerManager =
+        (LeadControllerManager) jobExecutionContext.getJobDetail().getJobDataMap()
+            .get(PinotTaskManager.LEAD_CONTROLLER_MANAGER_KEY);
+    String table = jobExecutionContext.getJobDetail().getKey().getName();
+    String taskType = jobExecutionContext.getJobDetail().getKey().getGroup();
+    if (leadControllerManager.isLeaderForTable(table)) {
+      LOGGER.info("Execute CronJob: table - {}, task - {} at {}", table, taskType, jobExecutionContext.getFireTime());
+      pinotTaskManager.scheduleTask(taskType, table);
+      LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
+          jobExecutionContext.getNextFireTime());
+    } else {
+      LOGGER.info("Not Lead, skip processing CronJob: table - {}, task - {}", table, taskType);
+    }
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index fe97326..18906dc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -21,10 +21,13 @@ package org.apache.pinot.controller.helix.core.minion;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -36,6 +39,18 @@ import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegi
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.JobBuilder;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.TriggerKey;
+import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,9 +64,21 @@ import org.slf4j.LoggerFactory;
 public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManager.class);
 
+  public final static String PINOT_TASK_MANAGER_KEY = "PinotTaskManager";
+  public final static String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
+  private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
+  private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+
+  private final static String SCHEDULE_KEY = "schedule";
+  private final static String TABLE_TASK_TYPE_SPLIT = "\t\t";
+
   private final PinotHelixTaskResourceManager _helixTaskResourceManager;
   private final ClusterInfoAccessor _clusterInfoAccessor;
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
+  private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>();
+  private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>();
+
+  private Scheduler _scheduledExecutorService = null;
 
   public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
       PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
@@ -62,6 +89,254 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoAccessor = new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
+    try {
+      _scheduledExecutorService = new StdSchedulerFactory().getScheduler();
+      _scheduledExecutorService.start();
+      LOGGER.info("Subscribe to tables change under PropertyStore path: {}", TABLE_CONFIG_PARENT_PATH);
+      _pinotHelixResourceManager.getPropertyStore()
+          .subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, (parentPath, currentChilds) -> {
+            for (String tableWithType : currentChilds) {
+              subscribeTableConfigChanges(tableWithType);
+            }
+            Set<String> tableToDelete = new HashSet(_tableTaskSchedulerUpdaterMap.keySet());
+            tableToDelete.removeAll(currentChilds);
+            if (!tableToDelete.isEmpty()) {
+              LOGGER.info("Found tables to delete: {}", tableToDelete);
+              for (String tableWithType : tableToDelete) {
+                cleanUpCronTaskSchedulerForTable(tableWithType);
+              }
+            }
+          });
+      for (String tableWithType : helixResourceManager.getAllTables()) {
+        subscribeTableConfigChanges(tableWithType);
+      }
+    } catch (SchedulerException e) {
+      LOGGER.error("Unable to create a scheduler.", e);
+    }
+  }
+
+  private String getPropertyStorePathForTable(String tableWithType) {
+    return TABLE_CONFIG_PATH_PREFIX + tableWithType;
+  }
+
+  protected synchronized void cleanupCronTaskScheduler() {
+    try {
+      _scheduledExecutorService.clear();
+    } catch (SchedulerException e) {
+      LOGGER.error("Failed to clear all tasks in scheduler", e);
+    }
+  }
+
+  public synchronized void cleanUpCronTaskSchedulerForTable(String tableWithType) {
+    LOGGER.info("Cleaning up task in scheduler for table {}", tableWithType);
+    TableTaskSchedulerUpdater tableTaskSchedulerUpdater = _tableTaskSchedulerUpdaterMap.get(tableWithType);
+    _pinotHelixResourceManager.getPropertyStore()
+        .unsubscribeDataChanges(getPropertyStorePathForTable(tableWithType), tableTaskSchedulerUpdater);
+    for (String taskType : _tableTaskTypeToCronExpressionMap.get(tableWithType).keySet()) {
+      try {
+        _scheduledExecutorService.deleteJob(JobKey.jobKey(tableWithType, taskType));
+      } catch (SchedulerException e) {
+        LOGGER.error("Failed to delete job for table {}, task type {}", tableWithType, taskType, e);
+      }
+    }
+    _tableTaskSchedulerUpdaterMap.remove(tableWithType);
+  }
+
+  public synchronized void subscribeTableConfigChanges(String tableWithType) {
+    LOGGER.info("Trying to subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+    if (_tableTaskSchedulerUpdaterMap.containsKey(tableWithType)) {
+      LOGGER.info("Skipping subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+      return;
+    }
+    TableTaskSchedulerUpdater tableTaskSchedulerUpdater = new TableTaskSchedulerUpdater(tableWithType, this);
+    LOGGER.info("subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+    _pinotHelixResourceManager.getPropertyStore()
+        .subscribeDataChanges(getPropertyStorePathForTable(tableWithType), tableTaskSchedulerUpdater);
+    _tableTaskSchedulerUpdaterMap.put(tableWithType, tableTaskSchedulerUpdater);
+    LOGGER.info("Finished subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+    LOGGER.info("Trying to update task schedule for table: {}", tableWithType);
+    try {
+      if (_scheduledExecutorService != null) {
+        updateCronTaskScheduler(tableWithType);
+      }
+    } catch (Exception e) {
+      LOGGER.error("Got exception during updateCronTaskScheduler for {}", tableWithType, e);
+    }
+  }
+
+  public synchronized void updateCronTaskScheduler(String tableWithType) {
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableWithType);
+    TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+    if (taskConfig == null) {
+      LOGGER.info("taskConfig is null, trying to remove all the tasks for table {} if any", tableWithType);
+      removeAllTasksFromCronExpressions(tableWithType);
+      return;
+    }
+    Map<String, Map<String, String>> taskTypeConfigsMap = taskConfig.getTaskTypeConfigsMap();
+    if (taskTypeConfigsMap == null) {
+      LOGGER.info("taskTypeConfigsMap is null, trying to remove all the tasks for table {} if any", tableWithType);
+      removeAllTasksFromCronExpressions(tableWithType);
+      return;
+    }
+    Map<String, String> taskToCronExpressionMap = getTaskToCronExpressionMap(taskTypeConfigsMap);
+    LOGGER.info("Got taskToCronExpressionMap {} ", taskToCronExpressionMap);
+    updateCronTaskScheduler(tableWithType, taskToCronExpressionMap);
+  }
+
+  private void updateCronTaskScheduler(String tableWithType, Map<String, String> taskToCronExpressionMap) {
+    if (_tableTaskTypeToCronExpressionMap.containsKey(tableWithType)) {
+      Map<String, String> existingScheduledTasks = _tableTaskTypeToCronExpressionMap.get(tableWithType);
+      for (String existingTaskType : existingScheduledTasks.keySet()) {
+        // Task should be removed
+        if (!taskToCronExpressionMap.containsKey(existingTaskType)) {
+          try {
+            _scheduledExecutorService.deleteJob(JobKey.jobKey(tableWithType, existingTaskType));
+          } catch (SchedulerException e) {
+            LOGGER.error("Failed to delete scheduled job for table {}, task type {}", tableWithType,
+                existingScheduledTasks, e);
+          }
+          continue;
+        }
+        String existingCronExpression = existingScheduledTasks.get(existingTaskType);
+        String newCronExpression = taskToCronExpressionMap.get(existingTaskType);
+        // Schedule new job
+        if (existingCronExpression == null) {
+          try {
+            scheduleJob(tableWithType, existingTaskType, newCronExpression);
+          } catch (SchedulerException e) {
+            LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", tableWithType,
+                existingTaskType, newCronExpression, e);
+          }
+          continue;
+        }
+        // Update existing task with new cron expr
+        if (!existingCronExpression.equalsIgnoreCase(newCronExpression)) {
+          try {
+            TriggerKey triggerKey = TriggerKey.triggerKey(tableWithType, existingTaskType);
+            Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey)
+                .withSchedule(CronScheduleBuilder.cronSchedule(newCronExpression)).build();
+            _scheduledExecutorService.rescheduleJob(triggerKey, trigger);
+          } catch (SchedulerException e) {
+            LOGGER.error("Failed to delete scheduled job for table {}, task type {}", tableWithType,
+                existingScheduledTasks, e);
+          }
+          continue;
+        }
+      }
+    } else {
+      for (String taskType : taskToCronExpressionMap.keySet()) {
+        // Schedule new job
+        String cronExpr = taskToCronExpressionMap.get(taskType);
+        try {
+          scheduleJob(tableWithType, taskType, cronExpr);
+        } catch (SchedulerException e) {
+          LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", tableWithType, taskType,
+              cronExpr, e);
+        }
+      }
+    }
+    _tableTaskTypeToCronExpressionMap.put(tableWithType, taskToCronExpressionMap);
+  }
+
+  private void scheduleJob(String tableWithType, String taskType, String cronExprStr)
+      throws SchedulerException {
+    boolean exists = false;
+    try {
+      exists = _scheduledExecutorService.checkExists(JobKey.jobKey(tableWithType, taskType));
+    } catch (SchedulerException e) {
+      LOGGER.error("Failed to check job existence for job key - table: {}, task: {} ", tableWithType, taskType, e);
+    }
+    if (!exists) {
+      LOGGER
+          .info("Trying to put cron expression: {} for table {}, task type: {}", cronExprStr, tableWithType, taskType);
+      Trigger trigger = TriggerBuilder.newTrigger().withIdentity(TriggerKey.triggerKey(tableWithType, taskType))
+          .withSchedule(CronScheduleBuilder.cronSchedule(cronExprStr)).build();
+      JobDataMap jobDataMap = new JobDataMap();
+      jobDataMap.put(PINOT_TASK_MANAGER_KEY, this);
+      jobDataMap.put(LEAD_CONTROLLER_MANAGER_KEY, this._leadControllerManager);
+      JobDetail jobDetail =
+          JobBuilder.newJob(CronJobScheduleJob.class).withIdentity(tableWithType, taskType).setJobData(jobDataMap)
+              .build();
+      try {
+        _scheduledExecutorService.scheduleJob(jobDetail, trigger);
+      } catch (Exception e) {
+        LOGGER.error("Failed to parse Cron expression - " + cronExprStr, e);
+        throw e;
+      }
+      Date nextRuntime = trigger.getNextFireTime();
+      LOGGER
+          .info("Scheduled task for table: {}, task type: {}, next runtime: {}", tableWithType, taskType, nextRuntime);
+    }
+  }
+
+  private Map<String, String> getTaskToCronExpressionMap(Map<String, Map<String, String>> taskTypeConfigsMap) {
+    Map<String, String> taskToCronExpressionMap = new HashMap<>();
+    for (String taskType : taskTypeConfigsMap.keySet()) {
+      Map<String, String> taskTypeConfig = taskTypeConfigsMap.get(taskType);
+      if (taskTypeConfig == null || !taskTypeConfig.containsKey(SCHEDULE_KEY)) {
+        continue;
+      }
+      String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
+      if (cronExprStr == null) {
+        continue;
+      }
+      taskToCronExpressionMap.put(taskType, cronExprStr);
+    }
+    return taskToCronExpressionMap;
+  }
+
+  private void removeAllTasksFromCronExpressions(String tableWithType) {
+    Map<String, String> toRemove = _tableTaskTypeToCronExpressionMap.remove(tableWithType);
+    if (toRemove == null) {
+      return;
+    }
+    for (String taskType : toRemove.keySet()) {
+      try {
+        _scheduledExecutorService.deleteJob(JobKey.jobKey(tableWithType, taskType));
+      } catch (SchedulerException e) {
+        LOGGER.error("Got exception when deleting the scheduled job - table {}, task type {}", tableWithType, taskType,
+            e);
+      }
+    }
+  }
+
+  public Map<String, Map<String, Date>> getTableTaskTypeToNextRuntimeMap()
+      throws SchedulerException {
+    Map<String, Map<String, Date>> tableTaskTypeToNextRuntimeMap = new HashMap<>();
+    for (String table : _tableTaskTypeToCronExpressionMap.keySet()) {
+      for (String taskType : _tableTaskTypeToCronExpressionMap.get(table).keySet()) {
+        TriggerKey triggerKey = TriggerKey.triggerKey(table, taskType);
+        LOGGER.info("Trying to get trigger for trigger key: {}", triggerKey);
+        Trigger trigger = _scheduledExecutorService.getTrigger(triggerKey);
+        if (trigger == null || trigger.getJobKey() == null) {
+          continue;
+        }
+        Date nextFireTime = trigger.getNextFireTime();
+        if (!tableTaskTypeToNextRuntimeMap.containsKey(table)) {
+          tableTaskTypeToNextRuntimeMap.put(table, new HashMap<>());
+        }
+        tableTaskTypeToNextRuntimeMap.get(table).put(taskType, nextFireTime);
+      }
+    }
+    LOGGER.info("getTableTaskTypeToNextRuntimeMap: {}", tableTaskTypeToNextRuntimeMap);
+    return tableTaskTypeToNextRuntimeMap;
+  }
+
+  public Map<String, Map<String, String>> getTableTaskTypeToCronExpressionMap() {
+    LOGGER.info("getTableTaskTypeToCronExpressionMap: {}", _tableTaskTypeToCronExpressionMap);
+    return _tableTaskTypeToCronExpressionMap;
+  }
+
+  public static String getTableTaskType(String tableWithType, String taskType) {
+    return tableWithType + TABLE_TASK_TYPE_SPLIT + taskType;
+  }
+
+  public static String getTableNameFromTableTaskType(String tableTaskType) {
+    return tableTaskType.split(TABLE_TASK_TYPE_SPLIT, 2)[0];
+  }
+
+  public static String getTaskTypeFromTableTaskType(String tableTaskType) {
+    return tableTaskType.split(TABLE_TASK_TYPE_SPLIT, 2)[1];
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TableTaskSchedulerUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TableTaskSchedulerUpdater.java
new file mode 100644
index 0000000..3381242
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TableTaskSchedulerUpdater.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.I0Itec.zkclient.IZkDataListener;
+
+
+public class TableTaskSchedulerUpdater implements IZkDataListener {
+  private final String _tableWithType;
+  private final PinotTaskManager _pinotTaskManager;
+
+  public TableTaskSchedulerUpdater(String tableWithType, PinotTaskManager pinotTaskManager) {
+    _tableWithType = tableWithType;
+    _pinotTaskManager = pinotTaskManager;
+  }
+
+  @Override
+  public void handleDataChange(String dataPath, Object data)
+      throws Exception {
+    _pinotTaskManager.updateCronTaskScheduler(_tableWithType);
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath)
+      throws Exception {
+    _pinotTaskManager.cleanUpCronTaskSchedulerForTable(_tableWithType);
+  }
+}
diff --git a/pom.xml b/pom.xml
index 7711a57..f9f682c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
     <scala.version>2.11.11</scala.version>
     <antlr.version>4.6</antlr.version>
     <jsonpath.version>2.4.0</jsonpath.version>
+    <quartz.version>2.3.0</quartz.version>
     <calcite.version>1.19.0</calcite.version>
     <lucene.version>8.2.0</lucene.version>
     <reflections.version>0.9.11</reflections.version>
@@ -510,6 +511,11 @@
         <version>2.0.1</version>
       </dependency>
       <dependency>
+        <groupId>org.quartz-scheduler</groupId>
+        <artifactId>quartz</artifactId>
+        <version>${quartz.version}</version>
+      </dependency>
+      <dependency>
         <groupId>javax.validation</groupId>
         <artifactId>validation-api</artifactId>
         <version>2.0.1.Final</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org