You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/04/13 22:56:34 UTC
[pinot] branch master updated: Add adhoc minion task creation endpoint (#8465)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 58ffe947b6 Add adhoc minion task creation endpoint (#8465)
58ffe947b6 is described below
commit 58ffe947b6c6509697fca8943282906e76351c90
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Wed Apr 13 15:56:27 2022 -0700
Add adhoc minion task creation endpoint (#8465)
* Support submit SegmentGenerationAndPushTask in an adhoc manner
1. Add adhoc task creation endpoint for PinotTaskRestletResource endpoint
2. Add adhoc minion task submission endpoint and the implementation of SegmentGenerationAndPushTask
* Address comments
---
.../pinot/common/metrics/ControllerMeter.java | 3 +-
.../api/exception/NoTaskScheduledException.java | 29 +++++++
.../api/exception/TaskAlreadyExistsException.java | 29 +++++++
.../api/exception/UnknownTaskTypeException.java | 29 +++++++
.../api/resources/PinotTaskRestletResource.java | 40 +++++++++
.../core/minion/PinotHelixTaskResourceManager.java | 36 +++++++-
.../helix/core/minion/PinotTaskManager.java | 72 ++++++++++++++++
.../core/minion/generator/BaseTaskGenerator.java | 11 +++
.../core/minion/generator/PinotTaskGenerator.java | 7 ++
.../SegmentGenerationAndPushTaskGenerator.java | 97 ++++++++++++++++++++--
.../pinot/spi/config/task/AdhocTaskConfig.java | 81 ++++++++++++++++++
.../pinot/spi/config/task/AdhocTaskConfigTest.java | 43 ++++++++++
12 files changed, 469 insertions(+), 8 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 55f194856c..082893b7ff 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -50,7 +50,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
NUMBER_TASKS_SUBMITTED("tasks", false),
NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
- LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false);
+ LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false),
+ NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);
private final String _brokerMeterName;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/NoTaskScheduledException.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/NoTaskScheduledException.java
new file mode 100644
index 0000000000..7a986b8a97
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/NoTaskScheduledException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.exception;
+
+public class NoTaskScheduledException extends RuntimeException {
+ public NoTaskScheduledException(String message) {
+ super(message);
+ }
+
+ public NoTaskScheduledException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/TaskAlreadyExistsException.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/TaskAlreadyExistsException.java
new file mode 100644
index 0000000000..941cd2a767
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/TaskAlreadyExistsException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.exception;
+
+public class TaskAlreadyExistsException extends RuntimeException {
+ public TaskAlreadyExistsException(String message) {
+ super(message);
+ }
+
+ public TaskAlreadyExistsException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/UnknownTaskTypeException.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/UnknownTaskTypeException.java
new file mode 100644
index 0000000000..a283335483
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/UnknownTaskTypeException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.exception;
+
+public class UnknownTaskTypeException extends RuntimeException {
+ public UnknownTaskTypeException(String message) {
+ super(message);
+ }
+
+ public UnknownTaskTypeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 12b16edca6..0e52c5bbc4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -38,13 +38,21 @@ import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.controller.api.access.AccessType;
import org.apache.pinot.controller.api.access.Authenticate;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
+import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException;
+import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
import org.quartz.CronTrigger;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
@@ -55,6 +63,8 @@ import org.quartz.SchedulerMetaData;
import org.quartz.SimpleTrigger;
import org.quartz.Trigger;
import org.quartz.impl.matchers.GroupMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -67,6 +77,7 @@ import org.quartz.impl.matchers.GroupMatcher;
* <li>GET '/tasks/task/{taskName}/state': Get the task state for the given task</li>
* <li>GET '/tasks/task/{taskName}/config': Get the task config (a list of child task configs) for the given task</li>
* <li>POST '/tasks/schedule': Schedule tasks</li>
+ * <li>POST '/tasks/execute': Execute an adhoc task</li>
* <li>PUT '/tasks/{taskType}/cleanup': Clean up finished tasks (COMPLETED, FAILED) for the given task type</li>
* <li>PUT '/tasks/{taskType}/stop': Stop all running/pending tasks (as well as the task queue) for the given task
* type</li>
@@ -78,6 +89,8 @@ import org.quartz.impl.matchers.GroupMatcher;
@Api(tags = Constants.TASK_TAG)
@Path("/")
public class PinotTaskRestletResource {
+ public static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskRestletResource.class);
+
private static final String TASK_QUEUE_STATE_STOP = "STOP";
private static final String TASK_QUEUE_STATE_RESUME = "RESUME";
@@ -366,6 +379,33 @@ public class PinotTaskRestletResource {
}
}
+ @POST
+ @Path("/tasks/execute")
+ @Authenticate(AccessType.CREATE)
+ @ApiOperation("Execute a task on minion")
+ public Map<String, String> executeAdhocTask(AdhocTaskConfig adhocTaskConfig) {
+ try {
+ return _pinotTaskManager.createTask(adhocTaskConfig.getTaskType(), adhocTaskConfig.getTableName(),
+ adhocTaskConfig.getTaskName(), adhocTaskConfig.getTaskConfigs());
+ } catch (TableNotFoundException e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to find table: " + adhocTaskConfig.getTableName(),
+ Response.Status.NOT_FOUND, e);
+ } catch (TaskAlreadyExistsException e) {
+ throw new ControllerApplicationException(LOGGER, "Task already exists: " + adhocTaskConfig.getTaskName(),
+ Response.Status.CONFLICT, e);
+ } catch (UnknownTaskTypeException e) {
+ throw new ControllerApplicationException(LOGGER, "Unknown task type: " + adhocTaskConfig.getTaskType(),
+ Response.Status.NOT_FOUND, e);
+ } catch (NoTaskScheduledException e) {
+ throw new ControllerApplicationException(LOGGER,
+ "No task is generated for table: " + adhocTaskConfig.getTableName() + ", with task type: "
+ + adhocTaskConfig.getTaskType(), Response.Status.BAD_REQUEST);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to create adhoc task: " + ExceptionUtils.getStackTrace(e), Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
@Deprecated
@PUT
@Path("/tasks/scheduletasks")
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 081377899d..177d437710 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
@@ -44,6 +45,7 @@ import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.pinot.common.utils.DateTimeUtils;
+import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.slf4j.Logger;
@@ -232,7 +234,29 @@ public class PinotHelixTaskResourceManager {
Preconditions.checkState(numConcurrentTasksPerInstance > 0);
String taskType = pinotTaskConfigs.get(0).getTaskType();
- String parentTaskName = TASK_PREFIX + taskType + TASK_NAME_SEPARATOR + System.currentTimeMillis();
+ String parentTaskName =
+ getParentTaskName(taskType, UUID.randomUUID() + "_" + System.currentTimeMillis());
+ return submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag, taskTimeoutMs,
+ numConcurrentTasksPerInstance);
+ }
+
+ /**
+ * Submit a list of child tasks with same task type to the Minion instances with the given tag.
+ *
+ * @param parentTaskName Parent task name to be submitted
+ * @param pinotTaskConfigs List of child task configs to be submitted
+ * @param minionInstanceTag Tag of the Minion instances to submit the task to
+ * @param taskTimeoutMs Timeout in milliseconds for each task
+ * @param numConcurrentTasksPerInstance Maximum number of concurrent tasks allowed per instance
+ * @return Name of the submitted parent task
+ */
+ public synchronized String submitTask(String parentTaskName, List<PinotTaskConfig> pinotTaskConfigs,
+ String minionInstanceTag, long taskTimeoutMs, int numConcurrentTasksPerInstance) {
+ int numChildTasks = pinotTaskConfigs.size();
+ Preconditions.checkState(numChildTasks > 0);
+ Preconditions.checkState(numConcurrentTasksPerInstance > 0);
+
+ String taskType = pinotTaskConfigs.get(0).getTaskType();
LOGGER
.info("Submitting parent task: {} of type: {} with {} child task configs: {} to Minion instances with tag: {}",
parentTaskName, taskType, numChildTasks, pinotTaskConfigs, minionInstanceTag);
@@ -347,7 +371,11 @@ public class PinotHelixTaskResourceManager {
*/
public synchronized TaskState getTaskState(String taskName) {
String taskType = getTaskType(taskName);
- return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
+ WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+ if (workflowContext == null) {
+ throw new UnknownTaskTypeException("Workflow context for task type doesn't exist: " + taskType);
+ }
+ return workflowContext.getJobState(getHelixJobName(taskName));
}
/**
@@ -611,6 +639,10 @@ public class PinotHelixTaskResourceManager {
return name.split(TASK_NAME_SEPARATOR)[1];
}
+ public String getParentTaskName(String taskType, String taskName) {
+ return TASK_PREFIX + taskType + TASK_NAME_SEPARATOR + taskName;
+ }
+
@JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "subtaskInfos"})
@JsonInclude(JsonInclude.Include.NON_NULL)
public static class TaskDebugInfo {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index c805e98160..308ba5ca63 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -27,17 +27,22 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.I0Itec.zkclient.IZkChildListener;
import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
+import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException;
+import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
@@ -45,6 +50,8 @@ import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTas
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
@@ -128,6 +135,71 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
}
}
+ public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName,
+ Map<String, String> taskConfigs)
+ throws Exception {
+ if (taskName == null) {
+ taskName = tableName + "_" + UUID.randomUUID();
+ LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+ }
+ String minionInstanceTag =
+ taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+ String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+ TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+ if (taskState != null) {
+ throw new TaskAlreadyExistsException(
+ "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+ }
+ List<String> tableNameWithTypes = new ArrayList<>();
+ if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
+ String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+ if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
+ tableNameWithTypes.add(offlineTableName);
+ }
+ String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) {
+ tableNameWithTypes.add(realtimeTableName);
+ }
+ } else {
+ if (_pinotHelixResourceManager.hasTable(tableName)) {
+ tableNameWithTypes.add(tableName);
+ }
+ }
+ if (tableNameWithTypes.isEmpty()) {
+ throw new TableNotFoundException("'tableName' " + tableName + " is not found");
+ }
+
+ PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+ // Generate each type of tasks
+ if (taskGenerator == null) {
+ throw new UnknownTaskTypeException(
+ "Task type: " + taskType + " is not registered, cannot enable it for table: " + tableName);
+ }
+ _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+ addTaskTypeMetricsUpdaterIfNeeded(taskType);
+
+ // responseMap holds the table to task name mapping.
+ Map<String, String> responseMap = new HashMap<>();
+ for (String tableNameWithType : tableNameWithTypes) {
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ LOGGER.info("Trying to create tasks of type: {}, table: {}", taskType, tableNameWithType);
+ List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(tableConfig, taskConfigs);
+ if (pinotTaskConfigs.isEmpty()) {
+ LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
+ continue;
+ }
+ LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs);
+ _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
+ responseMap.put(tableNameWithType,
+ _helixTaskResourceManager.submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag,
+ taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance()));
+ }
+ if (responseMap.isEmpty()) {
+ throw new NoTaskScheduledException("No task scheduled for 'tableName': " + tableName);
+ }
+ return responseMap;
+ }
+
private class ZkTableConfigChangeListener implements IZkChildListener {
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
index 4808d97a5c..071e285083 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
@@ -18,9 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.minion.generator;
+import java.util.List;
+import java.util.Map;
import org.apache.helix.task.JobConfig;
+import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,4 +73,10 @@ public abstract class BaseTaskGenerator implements PinotTaskGenerator {
}
return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
}
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+ throws Exception {
+ throw new UnknownTaskTypeException("Adhoc task generation is not supported for task type - " + this.getTaskType());
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
index 85eed0e7ab..fc469b409b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.minion.generator;
import java.util.List;
+import java.util.Map;
import org.apache.helix.task.JobConfig;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -45,6 +46,12 @@ public interface PinotTaskGenerator {
*/
List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
+ /**
+ * Generates a list of adhoc tasks to schedule based on the given table configs and task configs.
+ */
+ List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+ throws Exception;
+
/**
* Returns the timeout in milliseconds for each task, 3600000 (1 hour) by default.
*/
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
index 23e0c48428..94f4bbf956 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -32,6 +33,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
+import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
@@ -155,10 +158,14 @@ public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
+ "and input files from running tasks: {}", inputDirURI, existingSegmentInputFiles,
inputFilesFromRunningTasks);
List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, existingSegmentInputFiles);
- LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
+ if (inputFileURIs.size() < 10) {
+ LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
+ } else {
+ LOGGER.info("Final input files for task config generation: {}...", inputFileURIs.subList(0, 10));
+ }
for (URI inputFileURI : inputFileURIs) {
Map<String, String> singleFileGenerationTaskConfig =
- getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI);
+ getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI, null);
pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
singleFileGenerationTaskConfig));
tableNumTasks++;
@@ -177,6 +184,76 @@ public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
return pinotTaskConfigs;
}
+ @Override
+ public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+ throws Exception {
+ String taskUUID = UUID.randomUUID().toString();
+ // Only generate tasks for OFFLINE tables
+ String offlineTableName = tableConfig.getTableName();
+ if (tableConfig.getTableType() != TableType.OFFLINE) {
+ LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+ return ImmutableList.of();
+ }
+
+ // Override task configs from table with adhoc task configs.
+ Map<String, String> batchConfigMap = new HashMap<>();
+ TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+ if (tableTaskConfig != null) {
+ batchConfigMap.putAll(
+ tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+ }
+ batchConfigMap.putAll(taskConfigs);
+
+ int tableNumTasks = 0;
+ try {
+ URI inputDirURI =
+ SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+ List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+ if (inputFileURIs.isEmpty()) {
+ LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+ return ImmutableList.of();
+ }
+ if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+ batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+ extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+ }
+ updateRecordReaderConfigs(batchConfigMap);
+
+ List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+ LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
+ for (URI inputFileURI : inputFileURIs) {
+ Map<String, String> singleFileGenerationTaskConfig =
+ getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI,
+ generateFixedSegmentName(offlineTableName, taskUUID, tableNumTasks));
+ pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+ singleFileGenerationTaskConfig));
+ tableNumTasks++;
+ }
+ if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+ batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+ extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+ updateRecordReaderConfigs(batchConfigMap);
+ }
+ return pinotTaskConfigs;
+ } catch (Exception e) {
+ LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]",
+ tableConfig, taskConfigs, e);
+ throw e;
+ }
+ }
+
+ private String generateFixedSegmentName(String offlineTableName, String taskUUID, int tableNumTasks) {
+ return String.format("%s_%s_%d", offlineTableName, taskUUID, tableNumTasks);
+ }
+
+ private String extractFormatFromFileSuffix(String path) {
+ int lastDotPosition = path.lastIndexOf(IngestionConfigUtils.DOT_SEPARATOR);
+ if (lastDotPosition < 0) {
+ throw new UnsupportedOperationException("No file extension found");
+ }
+ return path.substring(lastDotPosition + 1);
+ }
+
private Set<String> getInputFilesFromRunningTasks(String offlineTableName) {
Set<String> inputFilesFromRunningTasks = new HashSet<>();
TaskGeneratorUtils
@@ -191,7 +268,7 @@ public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
}
private Map<String, String> getSingleFileGenerationTaskConfig(String offlineTableName, int sequenceID,
- Map<String, String> batchConfigMap, URI inputFileURI)
+ Map<String, String> batchConfigMap, URI inputFileURI, @Nullable String segmentName)
throws URISyntaxException {
URI inputDirURI = SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
@@ -210,8 +287,18 @@ public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString());
}
singleFileGenerationTaskConfig.put(BatchConfigProperties.SEQUENCE_ID, String.valueOf(sequenceID));
- singleFileGenerationTaskConfig
- .put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+ if (!singleFileGenerationTaskConfig.containsKey(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE)) {
+ if (segmentName == null) {
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+ BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+ } else {
+ singleFileGenerationTaskConfig.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+ BatchConfigProperties.SegmentNameGeneratorType.FIXED);
+ singleFileGenerationTaskConfig.put(
+ BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + "." + BatchConfigProperties.SEGMENT_NAME,
+ segmentName);
+ }
+ }
if ((outputDirURI == null) || (pushMode == null)) {
singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, DEFAULT_SEGMENT_PUSH_TYPE.toString());
} else {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/task/AdhocTaskConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/task/AdhocTaskConfig.java
new file mode 100644
index 0000000000..3a1f5b1068
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/task/AdhocTaskConfig.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * AdhocTaskConfig configuration.
+ * <pre>
+ * Example:
+ * {
+ * "taskType": "SegmentGenerationAndPushTask",
+ * "tableName": "myTable",
+ * "taskName": "myTask-0",
+ * "taskConfigs": {
+ * "inputDirURI": "s3://my-bucket/my-file.json",
+ * "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
+ * "input.fs.prop.accessKey": "<aws-access-key>",
+ * "input.fs.prop.secretKey": "<aws-secret-key>",
+ * "input.fs.prop.region": "us-west-2"
+ * }
+ * }
+ * </pre>
+ */
+public class AdhocTaskConfig extends BaseJsonConfig {
+ private String _taskType;
+ private String _tableName;
+ private String _taskName;
+ private Map<String, String> _taskConfigs;
+
+ @JsonCreator
+ public AdhocTaskConfig(@JsonProperty(value = "taskType", required = true) String taskType,
+ @JsonProperty(value = "tableName", required = true) String tableName,
+ @JsonProperty(value = "taskName") @Nullable String taskName,
+ @JsonProperty("taskConfigs") @Nullable Map<String, String> taskConfigs) {
+ Preconditions.checkArgument(taskType != null, "'taskType' must be configured");
+ Preconditions.checkArgument(tableName != null, "'tableName' must be configured");
+ _taskType = taskType;
+ _tableName = tableName;
+ _taskName = taskName;
+ _taskConfigs = taskConfigs;
+ }
+
+ public String getTaskType() {
+ return _taskType;
+ }
+
+ public String getTableName() {
+ return _tableName;
+ }
+
+ public String getTaskName() {
+ return _taskName;
+ }
+
+ public Map<String, String> getTaskConfigs() {
+ return _taskConfigs;
+ }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/task/AdhocTaskConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/task/AdhocTaskConfigTest.java
new file mode 100644
index 0000000000..9b82b73ab5
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/task/AdhocTaskConfigTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.task;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class AdhocTaskConfigTest {
+
+ @Test
+ public void testDeserializeFromJson()
+ throws IOException {
+ AdhocTaskConfig adhocTaskConfig = new AdhocTaskConfig("SegmentGenerationAndPushTask", "myTable", "myTask-0",
+ ImmutableMap.of("inputDirURI", "s3://my-bucket/my-file.json"));
+ adhocTaskConfig = JsonUtils.stringToObject(JsonUtils.objectToString(adhocTaskConfig), AdhocTaskConfig.class);
+ assertEquals(adhocTaskConfig.getTaskType(), "SegmentGenerationAndPushTask");
+ assertEquals(adhocTaskConfig.getTableName(), "myTable");
+ assertEquals(adhocTaskConfig.getTaskName(), "myTask-0");
+ assertEquals(adhocTaskConfig.getTaskConfigs().size(), 1);
+ assertEquals(adhocTaskConfig.getTaskConfigs().get("inputDirURI"), "s3://my-bucket/my-file.json");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org