You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/17 22:31:23 UTC
[incubator-pinot] 01/01: Adding task scheduler for pinot tasks in
cron expression
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch adding_task_scheduler
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 29072007dd1ca2d5ef379cd5290bd90363f48f9b
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Sun Jan 17 14:30:55 2021 -0800
Adding task scheduler for pinot tasks in cron expression
---
pinot-controller/pom.xml | 4 +
.../api/resources/PinotTaskRestletResource.java | 60 +++++
.../helix/core/minion/CronJobScheduleJob.java | 54 ++++
.../helix/core/minion/PinotTaskManager.java | 275 +++++++++++++++++++++
.../core/minion/TableTaskSchedulerUpdater.java | 44 ++++
pom.xml | 6 +
6 files changed, 443 insertions(+)
diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index 1bd4890..9746a0e 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -204,6 +204,10 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ </dependency>
</dependencies>
<build>
<resources>
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index aa3235e..3e021c3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -22,6 +22,8 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,6 +40,7 @@ import org.apache.helix.task.TaskState;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.quartz.SchedulerException;
/**
@@ -167,6 +170,63 @@ public class PinotTaskRestletResource {
return _pinotHelixTaskResourceManager.getTaskConfigs(taskName);
}
+ @GET
+ @Path("/tasks/cron/schedules")
+ @ApiOperation("Fetch cron tasks schedule")
+ public Map<String, Map<String, String>> getCronSchedules(
+ @ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
+ @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) {
+ Map<String, Map<String, String>> tableTaskTypeToCronExpressionMap = _pinotTaskManager.getTableTaskTypeToCronExpressionMap();
+ Map<String, Map<String, String>> results = new HashMap<>();
+ for (String table : tableTaskTypeToCronExpressionMap.keySet()) {
+ if (tableName != null && !table.equalsIgnoreCase(table)) {
+ continue;
+ }
+ Map<String, String> taskTypeToCronExpressionMap = tableTaskTypeToCronExpressionMap.get(table);
+ for (String task : taskTypeToCronExpressionMap.keySet()) {
+ if (taskType != null && !task.equalsIgnoreCase(taskType)) {
+ continue;
+ }
+ if (!results.containsKey(table)) {
+ results.put(table, new HashMap<>());
+ }
+ results.get(table).put(task, taskTypeToCronExpressionMap.get(task));
+ }
+ }
+ return results;
+ }
+
+ @GET
+ @Path("/tasks/cron/nextruntimes")
+ @ApiOperation("Fetch cron tasks next runtimes")
+ public Map<String, Map<String, Date>> getCronNextRuntimes(
+ @ApiParam(value = "Task type") @QueryParam("taskType") String taskType,
+ @ApiParam(value = "Table name (with type suffix)") @QueryParam("tableName") String tableName) {
+ try {
+ Map<String, Map<String, Date>> tableTaskTypeToNextRuntimeMap =
+ _pinotTaskManager.getTableTaskTypeToNextRuntimeMap();
+ Map<String, Map<String, Date>> results = new HashMap<>();
+ for (String table : tableTaskTypeToNextRuntimeMap.keySet()) {
+ if (tableName != null && !table.equalsIgnoreCase(table)) {
+ continue;
+ }
+ Map<String, Date> taskTypeToNextRuntimeMap = tableTaskTypeToNextRuntimeMap.get(table);
+ for (String task : taskTypeToNextRuntimeMap.keySet()) {
+ if (taskType != null && !task.equalsIgnoreCase(taskType)) {
+ continue;
+ }
+ if (!results.containsKey(table)) {
+ results.put(table, new HashMap<>());
+ }
+ results.get(table).put(task, taskTypeToNextRuntimeMap.get(task));
+ }
+ }
+ return results;
+ } catch (SchedulerException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@POST
@Path("/tasks/schedule")
@ApiOperation("Schedule tasks and return a map from task type to task name scheduled")
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
new file mode 100644
index 0000000..e5653f4
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.apache.pinot.controller.LeadControllerManager;
+import org.quartz.Job;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class CronJobScheduleJob implements Job {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CronJobScheduleJob.class);
+
+ public CronJobScheduleJob() {
+ }
+
+ @Override
+ public void execute(JobExecutionContext jobExecutionContext)
+ throws JobExecutionException {
+ PinotTaskManager pinotTaskManager = (PinotTaskManager) jobExecutionContext.getJobDetail().getJobDataMap()
+ .get(PinotTaskManager.PINOT_TASK_MANAGER_KEY);
+ LeadControllerManager leadControllerManager =
+ (LeadControllerManager) jobExecutionContext.getJobDetail().getJobDataMap()
+ .get(PinotTaskManager.LEAD_CONTROLLER_MANAGER_KEY);
+ String table = jobExecutionContext.getJobDetail().getKey().getName();
+ String taskType = jobExecutionContext.getJobDetail().getKey().getGroup();
+ if (leadControllerManager.isLeaderForTable(table)) {
+ LOGGER.info("Execute CronJob: table - {}, task - {} at {}", table, taskType, jobExecutionContext.getFireTime());
+ pinotTaskManager.scheduleTask(taskType, table);
+ LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
+ jobExecutionContext.getNextFireTime());
+ } else {
+ LOGGER.info("Not Lead, skip processing CronJob: table - {}, task - {}", table, taskType);
+ }
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index fe97326..18906dc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -21,10 +21,13 @@ package org.apache.pinot.controller.helix.core.minion;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -36,6 +39,18 @@ import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegi
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.JobBuilder;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.TriggerKey;
+import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,9 +64,21 @@ import org.slf4j.LoggerFactory;
public class PinotTaskManager extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskManager.class);
+ public final static String PINOT_TASK_MANAGER_KEY = "PinotTaskManager";
+ public final static String LEAD_CONTROLLER_MANAGER_KEY = "LeadControllerManager";
+ private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
+ private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+
+ private final static String SCHEDULE_KEY = "schedule";
+ private final static String TABLE_TASK_TYPE_SPLIT = "\t\t";
+
private final PinotHelixTaskResourceManager _helixTaskResourceManager;
private final ClusterInfoAccessor _clusterInfoAccessor;
private final TaskGeneratorRegistry _taskGeneratorRegistry;
+ private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>();
+ private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>();
+
+ private Scheduler _scheduledExecutorService = null;
public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
@@ -62,6 +89,254 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
_helixTaskResourceManager = helixTaskResourceManager;
_clusterInfoAccessor = new ClusterInfoAccessor(helixResourceManager, helixTaskResourceManager, controllerConf);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
+ try {
+ _scheduledExecutorService = new StdSchedulerFactory().getScheduler();
+ _scheduledExecutorService.start();
+ LOGGER.info("Subscribe to tables change under PropertyStore path: {}", TABLE_CONFIG_PARENT_PATH);
+ _pinotHelixResourceManager.getPropertyStore()
+ .subscribeChildChanges(TABLE_CONFIG_PARENT_PATH, (parentPath, currentChilds) -> {
+ for (String tableWithType : currentChilds) {
+ subscribeTableConfigChanges(tableWithType);
+ }
+ Set<String> tableToDelete = new HashSet(_tableTaskSchedulerUpdaterMap.keySet());
+ tableToDelete.removeAll(currentChilds);
+ if (!tableToDelete.isEmpty()) {
+ LOGGER.info("Found tables to delete: {}", tableToDelete);
+ for (String tableWithType : tableToDelete) {
+ cleanUpCronTaskSchedulerForTable(tableWithType);
+ }
+ }
+ });
+ for (String tableWithType : helixResourceManager.getAllTables()) {
+ subscribeTableConfigChanges(tableWithType);
+ }
+ } catch (SchedulerException e) {
+ LOGGER.error("Unable to create a scheduler.", e);
+ }
+ }
+
+ private String getPropertyStorePathForTable(String tableWithType) {
+ return TABLE_CONFIG_PATH_PREFIX + tableWithType;
+ }
+
+ protected synchronized void cleanupCronTaskScheduler() {
+ try {
+ _scheduledExecutorService.clear();
+ } catch (SchedulerException e) {
+ LOGGER.error("Failed to clear all tasks in scheduler", e);
+ }
+ }
+
+ public synchronized void cleanUpCronTaskSchedulerForTable(String tableWithType) {
+ LOGGER.info("Cleaning up task in scheduler for table {}", tableWithType);
+ TableTaskSchedulerUpdater tableTaskSchedulerUpdater = _tableTaskSchedulerUpdaterMap.get(tableWithType);
+ _pinotHelixResourceManager.getPropertyStore()
+ .unsubscribeDataChanges(getPropertyStorePathForTable(tableWithType), tableTaskSchedulerUpdater);
+ for (String taskType : _tableTaskTypeToCronExpressionMap.get(tableWithType).keySet()) {
+ try {
+ _scheduledExecutorService.deleteJob(JobKey.jobKey(tableWithType, taskType));
+ } catch (SchedulerException e) {
+ LOGGER.error("Failed to delete job for table {}, task type {}", tableWithType, taskType, e);
+ }
+ }
+ _tableTaskSchedulerUpdaterMap.remove(tableWithType);
+ }
+
+ public synchronized void subscribeTableConfigChanges(String tableWithType) {
+ LOGGER.info("Trying to subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+ if (_tableTaskSchedulerUpdaterMap.containsKey(tableWithType)) {
+ LOGGER.info("Skipping subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+ return;
+ }
+ TableTaskSchedulerUpdater tableTaskSchedulerUpdater = new TableTaskSchedulerUpdater(tableWithType, this);
+ LOGGER.info("subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+ _pinotHelixResourceManager.getPropertyStore()
+ .subscribeDataChanges(getPropertyStorePathForTable(tableWithType), tableTaskSchedulerUpdater);
+ _tableTaskSchedulerUpdaterMap.put(tableWithType, tableTaskSchedulerUpdater);
+ LOGGER.info("Finished subscribeTableConfigChanges: {}", getPropertyStorePathForTable(tableWithType));
+ LOGGER.info("Trying to update task schedule for table: {}", tableWithType);
+ try {
+ if (_scheduledExecutorService != null) {
+ updateCronTaskScheduler(tableWithType);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Got exception during updateCronTaskScheduler for {}", tableWithType, e);
+ }
+ }
+
+ public synchronized void updateCronTaskScheduler(String tableWithType) {
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableWithType);
+ TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+ if (taskConfig == null) {
+ LOGGER.info("taskConfig is null, trying to remove all the tasks for table {} if any", tableWithType);
+ removeAllTasksFromCronExpressions(tableWithType);
+ return;
+ }
+ Map<String, Map<String, String>> taskTypeConfigsMap = taskConfig.getTaskTypeConfigsMap();
+ if (taskTypeConfigsMap == null) {
+ LOGGER.info("taskTypeConfigsMap is null, trying to remove all the tasks for table {} if any", tableWithType);
+ removeAllTasksFromCronExpressions(tableWithType);
+ return;
+ }
+ Map<String, String> taskToCronExpressionMap = getTaskToCronExpressionMap(taskTypeConfigsMap);
+ LOGGER.info("Got taskToCronExpressionMap {} ", taskToCronExpressionMap);
+ updateCronTaskScheduler(tableWithType, taskToCronExpressionMap);
+ }
+
+ private void updateCronTaskScheduler(String tableWithType, Map<String, String> taskToCronExpressionMap) {
+ if (_tableTaskTypeToCronExpressionMap.containsKey(tableWithType)) {
+ Map<String, String> existingScheduledTasks = _tableTaskTypeToCronExpressionMap.get(tableWithType);
+ for (String existingTaskType : existingScheduledTasks.keySet()) {
+ // Task should be removed
+ if (!taskToCronExpressionMap.containsKey(existingTaskType)) {
+ try {
+ _scheduledExecutorService.deleteJob(JobKey.jobKey(tableWithType, existingTaskType));
+ } catch (SchedulerException e) {
+ LOGGER.error("Failed to delete scheduled job for table {}, task type {}", tableWithType,
+ existingScheduledTasks, e);
+ }
+ continue;
+ }
+ String existingCronExpression = existingScheduledTasks.get(existingTaskType);
+ String newCronExpression = taskToCronExpressionMap.get(existingTaskType);
+ // Schedule new job
+ if (existingCronExpression == null) {
+ try {
+ scheduleJob(tableWithType, existingTaskType, newCronExpression);
+ } catch (SchedulerException e) {
+ LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", tableWithType,
+ existingTaskType, newCronExpression, e);
+ }
+ continue;
+ }
+ // Update existing task with new cron expr
+ if (!existingCronExpression.equalsIgnoreCase(newCronExpression)) {
+ try {
+ TriggerKey triggerKey = TriggerKey.triggerKey(tableWithType, existingTaskType);
+ Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey)
+ .withSchedule(CronScheduleBuilder.cronSchedule(newCronExpression)).build();
+ _scheduledExecutorService.rescheduleJob(triggerKey, trigger);
+ } catch (SchedulerException e) {
+ LOGGER.error("Failed to delete scheduled job for table {}, task type {}", tableWithType,
+ existingScheduledTasks, e);
+ }
+ continue;
+ }
+ }
+ } else {
+ for (String taskType : taskToCronExpressionMap.keySet()) {
+ // Schedule new job
+ String cronExpr = taskToCronExpressionMap.get(taskType);
+ try {
+ scheduleJob(tableWithType, taskType, cronExpr);
+ } catch (SchedulerException e) {
+ LOGGER.error("Failed to schedule cron task for table {}, task {}, cron expr {}", tableWithType, taskType,
+ cronExpr, e);
+ }
+ }
+ }
+ _tableTaskTypeToCronExpressionMap.put(tableWithType, taskToCronExpressionMap);
+ }
+
+ private void scheduleJob(String tableWithType, String taskType, String cronExprStr)
+ throws SchedulerException {
+ boolean exists = false;
+ try {
+ exists = _scheduledExecutorService.checkExists(JobKey.jobKey(tableWithType, taskType));
+ } catch (SchedulerException e) {
+ LOGGER.error("Failed to check job existence for job key - table: {}, task: {} ", tableWithType, taskType, e);
+ }
+ if (!exists) {
+ LOGGER
+ .info("Trying to put cron expression: {} for table {}, task type: {}", cronExprStr, tableWithType, taskType);
+ Trigger trigger = TriggerBuilder.newTrigger().withIdentity(TriggerKey.triggerKey(tableWithType, taskType))
+ .withSchedule(CronScheduleBuilder.cronSchedule(cronExprStr)).build();
+ JobDataMap jobDataMap = new JobDataMap();
+ jobDataMap.put(PINOT_TASK_MANAGER_KEY, this);
+ jobDataMap.put(LEAD_CONTROLLER_MANAGER_KEY, this._leadControllerManager);
+ JobDetail jobDetail =
+ JobBuilder.newJob(CronJobScheduleJob.class).withIdentity(tableWithType, taskType).setJobData(jobDataMap)
+ .build();
+ try {
+ _scheduledExecutorService.scheduleJob(jobDetail, trigger);
+ } catch (Exception e) {
+ LOGGER.error("Failed to parse Cron expression - " + cronExprStr, e);
+ throw e;
+ }
+ Date nextRuntime = trigger.getNextFireTime();
+ LOGGER
+ .info("Scheduled task for table: {}, task type: {}, next runtime: {}", tableWithType, taskType, nextRuntime);
+ }
+ }
+
+ private Map<String, String> getTaskToCronExpressionMap(Map<String, Map<String, String>> taskTypeConfigsMap) {
+ Map<String, String> taskToCronExpressionMap = new HashMap<>();
+ for (String taskType : taskTypeConfigsMap.keySet()) {
+ Map<String, String> taskTypeConfig = taskTypeConfigsMap.get(taskType);
+ if (taskTypeConfig == null || !taskTypeConfig.containsKey(SCHEDULE_KEY)) {
+ continue;
+ }
+ String cronExprStr = taskTypeConfig.get(SCHEDULE_KEY);
+ if (cronExprStr == null) {
+ continue;
+ }
+ taskToCronExpressionMap.put(taskType, cronExprStr);
+ }
+ return taskToCronExpressionMap;
+ }
+
+ private void removeAllTasksFromCronExpressions(String tableWithType) {
+ Map<String, String> toRemove = _tableTaskTypeToCronExpressionMap.remove(tableWithType);
+ if (toRemove == null) {
+ return;
+ }
+ for (String taskType : toRemove.keySet()) {
+ try {
+ _scheduledExecutorService.deleteJob(JobKey.jobKey(tableWithType, taskType));
+ } catch (SchedulerException e) {
+ LOGGER.error("Got exception when deleting the scheduled job - table {}, task type {}", tableWithType, taskType,
+ e);
+ }
+ }
+ }
+
+ public Map<String, Map<String, Date>> getTableTaskTypeToNextRuntimeMap()
+ throws SchedulerException {
+ Map<String, Map<String, Date>> tableTaskTypeToNextRuntimeMap = new HashMap<>();
+ for (String table : _tableTaskTypeToCronExpressionMap.keySet()) {
+ for (String taskType : _tableTaskTypeToCronExpressionMap.get(table).keySet()) {
+ TriggerKey triggerKey = TriggerKey.triggerKey(table, taskType);
+ LOGGER.info("Trying to get trigger for trigger key: {}", triggerKey);
+ Trigger trigger = _scheduledExecutorService.getTrigger(triggerKey);
+ if (trigger == null || trigger.getJobKey() == null) {
+ continue;
+ }
+ Date nextFireTime = trigger.getNextFireTime();
+ if (!tableTaskTypeToNextRuntimeMap.containsKey(table)) {
+ tableTaskTypeToNextRuntimeMap.put(table, new HashMap<>());
+ }
+ tableTaskTypeToNextRuntimeMap.get(table).put(taskType, nextFireTime);
+ }
+ }
+ LOGGER.info("getTableTaskTypeToNextRuntimeMap: {}", tableTaskTypeToNextRuntimeMap);
+ return tableTaskTypeToNextRuntimeMap;
+ }
+
+ public Map<String, Map<String, String>> getTableTaskTypeToCronExpressionMap() {
+ LOGGER.info("getTableTaskTypeToCronExpressionMap: {}", _tableTaskTypeToCronExpressionMap);
+ return _tableTaskTypeToCronExpressionMap;
+ }
+
+ public static String getTableTaskType(String tableWithType, String taskType) {
+ return tableWithType + TABLE_TASK_TYPE_SPLIT + taskType;
+ }
+
+ public static String getTableNameFromTableTaskType(String tableTaskType) {
+ return tableTaskType.split(TABLE_TASK_TYPE_SPLIT, 2)[0];
+ }
+
+ public static String getTaskTypeFromTableTaskType(String tableTaskType) {
+ return tableTaskType.split(TABLE_TASK_TYPE_SPLIT, 2)[1];
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TableTaskSchedulerUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TableTaskSchedulerUpdater.java
new file mode 100644
index 0000000..3381242
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TableTaskSchedulerUpdater.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.I0Itec.zkclient.IZkDataListener;
+
+
+public class TableTaskSchedulerUpdater implements IZkDataListener {
+ private final String _tableWithType;
+ private final PinotTaskManager _pinotTaskManager;
+
+ public TableTaskSchedulerUpdater(String tableWithType, PinotTaskManager pinotTaskManager) {
+ _tableWithType = tableWithType;
+ _pinotTaskManager = pinotTaskManager;
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data)
+ throws Exception {
+ _pinotTaskManager.updateCronTaskScheduler(_tableWithType);
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath)
+ throws Exception {
+ _pinotTaskManager.cleanUpCronTaskSchedulerForTable(_tableWithType);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 7711a57..f9f682c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,7 @@
<scala.version>2.11.11</scala.version>
<antlr.version>4.6</antlr.version>
<jsonpath.version>2.4.0</jsonpath.version>
+ <quartz.version>2.3.0</quartz.version>
<calcite.version>1.19.0</calcite.version>
<lucene.version>8.2.0</lucene.version>
<reflections.version>0.9.11</reflections.version>
@@ -510,6 +511,11 @@
<version>2.0.1</version>
</dependency>
<dependency>
+ <groupId>org.quartz-scheduler</groupId>
+ <artifactId>quartz</artifactId>
+ <version>${quartz.version}</version>
+ </dependency>
+ <dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>2.0.1.Final</version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org