You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/04/13 22:56:34 UTC

[pinot] branch master updated: Add adhoc minion task creation endpoint (#8465)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 58ffe947b6 Add adhoc minion task creation endpoint (#8465)
58ffe947b6 is described below

commit 58ffe947b6c6509697fca8943282906e76351c90
Author: Xiang Fu <xi...@gmail.com>
AuthorDate: Wed Apr 13 15:56:27 2022 -0700

    Add adhoc minion task creation endpoint (#8465)
    
    * Support submit SegmentGenerationAndPushTask in an adhoc manner
    1. Add adhoc task creation endpoint for PinotTaskRestletResource endpoint
    2. Add adhoc minion task submission endpoint and the implementation of SegmentGenerationAndPushTask
    
    * Address comments
---
 .../pinot/common/metrics/ControllerMeter.java      |  3 +-
 .../api/exception/NoTaskScheduledException.java    | 29 +++++++
 .../api/exception/TaskAlreadyExistsException.java  | 29 +++++++
 .../api/exception/UnknownTaskTypeException.java    | 29 +++++++
 .../api/resources/PinotTaskRestletResource.java    | 40 +++++++++
 .../core/minion/PinotHelixTaskResourceManager.java | 36 +++++++-
 .../helix/core/minion/PinotTaskManager.java        | 72 ++++++++++++++++
 .../core/minion/generator/BaseTaskGenerator.java   | 11 +++
 .../core/minion/generator/PinotTaskGenerator.java  |  7 ++
 .../SegmentGenerationAndPushTaskGenerator.java     | 97 ++++++++++++++++++++--
 .../pinot/spi/config/task/AdhocTaskConfig.java     | 81 ++++++++++++++++++
 .../pinot/spi/config/task/AdhocTaskConfigTest.java | 43 ++++++++++
 12 files changed, 469 insertions(+), 8 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
index 55f194856c..082893b7ff 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMeter.java
@@ -50,7 +50,8 @@ public enum ControllerMeter implements AbstractMetrics.Meter {
   NUMBER_TASKS_SUBMITTED("tasks", false),
   NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED("SegmentUploadTimeouts", true),
   CRON_SCHEDULER_JOB_TRIGGERED("cronSchedulerJobTriggered", false),
-  LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false);
+  LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR("LLCSegmentDeepStoreUploadRetryError", false),
+  NUMBER_ADHOC_TASKS_SUBMITTED("adhocTasks", false);
 
 
   private final String _brokerMeterName;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/NoTaskScheduledException.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/NoTaskScheduledException.java
new file mode 100644
index 0000000000..7a986b8a97
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/NoTaskScheduledException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.exception;
+
+public class NoTaskScheduledException extends RuntimeException {
+  public NoTaskScheduledException(String message) {
+    super(message);
+  }
+
+  public NoTaskScheduledException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/TaskAlreadyExistsException.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/TaskAlreadyExistsException.java
new file mode 100644
index 0000000000..941cd2a767
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/TaskAlreadyExistsException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.exception;
+
+public class TaskAlreadyExistsException extends RuntimeException {
+  public TaskAlreadyExistsException(String message) {
+    super(message);
+  }
+
+  public TaskAlreadyExistsException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/UnknownTaskTypeException.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/UnknownTaskTypeException.java
new file mode 100644
index 0000000000..a283335483
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/exception/UnknownTaskTypeException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.exception;
+
+public class UnknownTaskTypeException extends RuntimeException {
+  public UnknownTaskTypeException(String message) {
+    super(message);
+  }
+
+  public UnknownTaskTypeException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 12b16edca6..0e52c5bbc4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -38,13 +38,21 @@ import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.controller.api.access.AccessType;
 import org.apache.pinot.controller.api.access.Authenticate;
+import org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
+import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException;
+import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.task.AdhocTaskConfig;
 import org.quartz.CronTrigger;
 import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
@@ -55,6 +63,8 @@ import org.quartz.SchedulerMetaData;
 import org.quartz.SimpleTrigger;
 import org.quartz.Trigger;
 import org.quartz.impl.matchers.GroupMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -67,6 +77,7 @@ import org.quartz.impl.matchers.GroupMatcher;
  *   <li>GET '/tasks/task/{taskName}/state': Get the task state for the given task</li>
  *   <li>GET '/tasks/task/{taskName}/config': Get the task config (a list of child task configs) for the given task</li>
  *   <li>POST '/tasks/schedule': Schedule tasks</li>
+ *   <li>POST '/tasks/execute': Execute an adhoc task</li>
  *   <li>PUT '/tasks/{taskType}/cleanup': Clean up finished tasks (COMPLETED, FAILED) for the given task type</li>
  *   <li>PUT '/tasks/{taskType}/stop': Stop all running/pending tasks (as well as the task queue) for the given task
  *   type</li>
@@ -78,6 +89,8 @@ import org.quartz.impl.matchers.GroupMatcher;
 @Api(tags = Constants.TASK_TAG)
 @Path("/")
 public class PinotTaskRestletResource {
+  public static final Logger LOGGER = LoggerFactory.getLogger(PinotTaskRestletResource.class);
+
   private static final String TASK_QUEUE_STATE_STOP = "STOP";
   private static final String TASK_QUEUE_STATE_RESUME = "RESUME";
 
@@ -366,6 +379,33 @@ public class PinotTaskRestletResource {
     }
   }
 
+  @POST
+  @Path("/tasks/execute")
+  @Authenticate(AccessType.CREATE)
+  @ApiOperation("Execute a task on minion")
+  public Map<String, String> executeAdhocTask(AdhocTaskConfig adhocTaskConfig) {
+    try {
+      return _pinotTaskManager.createTask(adhocTaskConfig.getTaskType(), adhocTaskConfig.getTableName(),
+          adhocTaskConfig.getTaskName(), adhocTaskConfig.getTaskConfigs());
+    } catch (TableNotFoundException e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find table: " + adhocTaskConfig.getTableName(),
+          Response.Status.NOT_FOUND, e);
+    } catch (TaskAlreadyExistsException e) {
+      throw new ControllerApplicationException(LOGGER, "Task already exists: " + adhocTaskConfig.getTaskName(),
+          Response.Status.CONFLICT, e);
+    } catch (UnknownTaskTypeException e) {
+      throw new ControllerApplicationException(LOGGER, "Unknown task type: " + adhocTaskConfig.getTaskType(),
+          Response.Status.NOT_FOUND, e);
+    } catch (NoTaskScheduledException e) {
+      throw new ControllerApplicationException(LOGGER,
+          "No task is generated for table: " + adhocTaskConfig.getTableName() + ", with task type: "
+              + adhocTaskConfig.getTaskType(), Response.Status.BAD_REQUEST);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          "Failed to create adhoc task: " + ExceptionUtils.getStackTrace(e), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   @Deprecated
   @PUT
   @Path("/tasks/scheduletasks")
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 081377899d..177d437710 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.commons.lang.StringUtils;
@@ -44,6 +45,7 @@ import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.pinot.common.utils.DateTimeUtils;
+import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.slf4j.Logger;
@@ -232,7 +234,29 @@ public class PinotHelixTaskResourceManager {
     Preconditions.checkState(numConcurrentTasksPerInstance > 0);
 
     String taskType = pinotTaskConfigs.get(0).getTaskType();
-    String parentTaskName = TASK_PREFIX + taskType + TASK_NAME_SEPARATOR + System.currentTimeMillis();
+    String parentTaskName =
+        getParentTaskName(taskType, UUID.randomUUID() + "_" + System.currentTimeMillis());
+    return submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag, taskTimeoutMs,
+        numConcurrentTasksPerInstance);
+  }
+
+  /**
+   * Submit a list of child tasks with same task type to the Minion instances with the given tag.
+   *
+   * @param parentTaskName Parent task name to be submitted
+   * @param pinotTaskConfigs List of child task configs to be submitted
+   * @param minionInstanceTag Tag of the Minion instances to submit the task to
+   * @param taskTimeoutMs Timeout in milliseconds for each task
+   * @param numConcurrentTasksPerInstance Maximum number of concurrent tasks allowed per instance
+   * @return Name of the submitted parent task
+   */
+  public synchronized String submitTask(String parentTaskName, List<PinotTaskConfig> pinotTaskConfigs,
+      String minionInstanceTag, long taskTimeoutMs, int numConcurrentTasksPerInstance) {
+    int numChildTasks = pinotTaskConfigs.size();
+    Preconditions.checkState(numChildTasks > 0);
+    Preconditions.checkState(numConcurrentTasksPerInstance > 0);
+
+    String taskType = pinotTaskConfigs.get(0).getTaskType();
     LOGGER
         .info("Submitting parent task: {} of type: {} with {} child task configs: {} to Minion instances with tag: {}",
             parentTaskName, taskType, numChildTasks, pinotTaskConfigs, minionInstanceTag);
@@ -347,7 +371,11 @@ public class PinotHelixTaskResourceManager {
    */
   public synchronized TaskState getTaskState(String taskName) {
     String taskType = getTaskType(taskName);
-    return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {
+      throw new UnknownTaskTypeException("Workflow context for task type doesn't exist: " + taskType);
+    }
+    return workflowContext.getJobState(getHelixJobName(taskName));
   }
 
   /**
@@ -611,6 +639,10 @@ public class PinotHelixTaskResourceManager {
     return name.split(TASK_NAME_SEPARATOR)[1];
   }
 
+  public String getParentTaskName(String taskType, String taskName) {
+    return TASK_PREFIX + taskType + TASK_NAME_SEPARATOR + taskName;
+  }
+
   @JsonPropertyOrder({"taskState", "subtaskCount", "startTime", "executionStartTime", "subtaskInfos"})
   @JsonInclude(JsonInclude.Include.NON_NULL)
   public static class TaskDebugInfo {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index c805e98160..308ba5ca63 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -27,17 +27,22 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.helix.AccessOption;
 import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
+import org.apache.pinot.controller.api.exception.TaskAlreadyExistsException;
+import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
 import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
@@ -45,6 +50,8 @@ import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTas
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.quartz.CronScheduleBuilder;
 import org.quartz.JobBuilder;
 import org.quartz.JobDataMap;
@@ -128,6 +135,71 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
     }
   }
 
+  public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName,
+      Map<String, String> taskConfigs)
+      throws Exception {
+    if (taskName == null) {
+      taskName = tableName + "_" + UUID.randomUUID();
+      LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+    }
+    String minionInstanceTag =
+        taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new TaskAlreadyExistsException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    List<String> tableNameWithTypes = new ArrayList<>();
+    if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
+      String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
+        tableNameWithTypes.add(offlineTableName);
+      }
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) {
+        tableNameWithTypes.add(realtimeTableName);
+      }
+    } else {
+      if (_pinotHelixResourceManager.hasTable(tableName)) {
+        tableNameWithTypes.add(tableName);
+      }
+    }
+    if (tableNameWithTypes.isEmpty()) {
+      throw new TableNotFoundException("'tableName' " + tableName + " is not found");
+    }
+
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator == null) {
+      throw new UnknownTaskTypeException(
+          "Task type: " + taskType + " is not registered, cannot enable it for table: " + tableName);
+    }
+    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+    addTaskTypeMetricsUpdaterIfNeeded(taskType);
+
+    // responseMap holds the table to task name mapping.
+    Map<String, String> responseMap = new HashMap<>();
+    for (String tableNameWithType : tableNameWithTypes) {
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      LOGGER.info("Trying to create tasks of type: {}, table: {}", taskType, tableNameWithType);
+      List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
+        continue;
+      }
+      LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs);
+      _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
+      responseMap.put(tableNameWithType,
+          _helixTaskResourceManager.submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag,
+              taskGenerator.getTaskTimeoutMs(), taskGenerator.getNumConcurrentTasksPerInstance()));
+    }
+    if (responseMap.isEmpty()) {
+      throw new NoTaskScheduledException("No task scheduled for 'tableName': " + tableName);
+    }
+    return responseMap;
+  }
+
   private class ZkTableConfigChangeListener implements IZkChildListener {
 
     @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
index 4808d97a5c..071e285083 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java
@@ -18,9 +18,14 @@
  */
 package org.apache.pinot.controller.helix.core.minion.generator;
 
+import java.util.List;
+import java.util.Map;
 import org.apache.helix.task.JobConfig;
+import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,4 +73,10 @@ public abstract class BaseTaskGenerator implements PinotTaskGenerator {
     }
     return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
   }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    throw new UnknownTaskTypeException("Adhoc task generation is not supported for task type - " + this.getTaskType());
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
index 85eed0e7ab..fc469b409b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.minion.generator;
 
 import java.util.List;
+import java.util.Map;
 import org.apache.helix.task.JobConfig;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -45,6 +46,12 @@ public interface PinotTaskGenerator {
    */
   List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
 
+  /**
+   * Generates a list of adhoc tasks to schedule based on the given table configs and task configs.
+   */
+  List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception;
+
   /**
    * Returns the timeout in milliseconds for each task, 3600000 (1 hour) by default.
    */
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
index 23e0c48428..94f4bbf956 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.segmentgenerationandpush;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -32,6 +33,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.controller.helix.core.minion.generator.BaseTaskGenerator;
@@ -155,10 +158,14 @@ public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
                   + "and input files from running tasks: {}", inputDirURI, existingSegmentInputFiles,
               inputFilesFromRunningTasks);
           List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, existingSegmentInputFiles);
-          LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
+          if (inputFileURIs.size() < 10) {
+            LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
+          } else {
+            LOGGER.info("Final input files for task config generation: {}...", inputFileURIs.subList(0, 10));
+          }
           for (URI inputFileURI : inputFileURIs) {
             Map<String, String> singleFileGenerationTaskConfig =
-                getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI);
+                getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI, null);
             pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
                 singleFileGenerationTaskConfig));
             tableNumTasks++;
@@ -177,6 +184,76 @@ public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+      }
+      updateRecordReaderConfigs(batchConfigMap);
+
+      List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+      LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
+      for (URI inputFileURI : inputFileURIs) {
+        Map<String, String> singleFileGenerationTaskConfig =
+            getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI,
+                generateFixedSegmentName(offlineTableName, taskUUID, tableNumTasks));
+        pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+            singleFileGenerationTaskConfig));
+        tableNumTasks++;
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+        updateRecordReaderConfigs(batchConfigMap);
+      }
+      return pinotTaskConfigs;
+    } catch (Exception e) {
+      LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]",
+          tableConfig, taskConfigs, e);
+      throw e;
+    }
+  }
+
+  private String generateFixedSegmentName(String offlineTableName, String taskUUID, int tableNumTasks) {
+    return String.format("%s_%s_%d", offlineTableName, taskUUID, tableNumTasks);
+  }
+
+  private String extractFormatFromFileSuffix(String path) {
+    int lastDotPosition = path.lastIndexOf(IngestionConfigUtils.DOT_SEPARATOR);
+    if (lastDotPosition < 0) {
+      throw new UnsupportedOperationException("No file extension found");
+    }
+    return path.substring(lastDotPosition + 1);
+  }
+
   private Set<String> getInputFilesFromRunningTasks(String offlineTableName) {
     Set<String> inputFilesFromRunningTasks = new HashSet<>();
     TaskGeneratorUtils
@@ -191,7 +268,7 @@ public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
   }
 
   private Map<String, String> getSingleFileGenerationTaskConfig(String offlineTableName, int sequenceID,
-      Map<String, String> batchConfigMap, URI inputFileURI)
+      Map<String, String> batchConfigMap, URI inputFileURI, @Nullable String segmentName)
       throws URISyntaxException {
 
     URI inputDirURI = SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
@@ -210,8 +287,18 @@ public class SegmentGenerationAndPushTaskGenerator extends BaseTaskGenerator {
       singleFileGenerationTaskConfig.put(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI, outputSegmentDirURI.toString());
     }
     singleFileGenerationTaskConfig.put(BatchConfigProperties.SEQUENCE_ID, String.valueOf(sequenceID));
-    singleFileGenerationTaskConfig
-        .put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+    if (!singleFileGenerationTaskConfig.containsKey(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE)) {
+      if (segmentName == null) {
+        singleFileGenerationTaskConfig.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+            BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+      } else {
+        singleFileGenerationTaskConfig.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+            BatchConfigProperties.SegmentNameGeneratorType.FIXED);
+        singleFileGenerationTaskConfig.put(
+            BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + "." + BatchConfigProperties.SEGMENT_NAME,
+            segmentName);
+      }
+    }
     if ((outputDirURI == null) || (pushMode == null)) {
       singleFileGenerationTaskConfig.put(BatchConfigProperties.PUSH_MODE, DEFAULT_SEGMENT_PUSH_TYPE.toString());
     } else {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/task/AdhocTaskConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/task/AdhocTaskConfig.java
new file mode 100644
index 0000000000..3a1f5b1068
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/task/AdhocTaskConfig.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.task;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+/**
+ * AdhocTaskConfig configuration.
+ * <pre>
+ * Example:
+ * {
+ *   "taskType": "SegmentGenerationAndPushTask",
+ *   "tableName": "myTable",
+ *   "taskName": "myTask-0",
+ *   "taskConfigs": {
+ *     "inputDirURI": "s3://my-bucket/my-file.json",
+ *     "input.fs.className": "org.apache.pinot.plugin.filesystem.S3PinotFS",
+ *     "input.fs.prop.accessKey": "<aws-access-key>",
+ *     "input.fs.prop.secretKey": "<aws-secret-key>",
+ *     "input.fs.prop.region": "us-west-2"
+ *   }
+ * }
+ * </pre>
+ */
+public class AdhocTaskConfig extends BaseJsonConfig {
+  private String _taskType;
+  private String _tableName;
+  private String _taskName;
+  private Map<String, String> _taskConfigs;
+
+  @JsonCreator
+  public AdhocTaskConfig(@JsonProperty(value = "taskType", required = true) String taskType,
+      @JsonProperty(value = "tableName", required = true) String tableName,
+      @JsonProperty(value = "taskName") @Nullable String taskName,
+      @JsonProperty("taskConfigs") @Nullable Map<String, String> taskConfigs) {
+    Preconditions.checkArgument(taskType != null, "'taskType' must be configured");
+    Preconditions.checkArgument(tableName != null, "'tableName' must be configured");
+    _taskType = taskType;
+    _tableName = tableName;
+    _taskName = taskName;
+    _taskConfigs = taskConfigs;
+  }
+
+  public String getTaskType() {
+    return _taskType;
+  }
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public String getTaskName() {
+    return _taskName;
+  }
+
+  public Map<String, String> getTaskConfigs() {
+    return _taskConfigs;
+  }
+}
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/task/AdhocTaskConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/task/AdhocTaskConfigTest.java
new file mode 100644
index 0000000000..9b82b73ab5
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/task/AdhocTaskConfigTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.task;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class AdhocTaskConfigTest {
+
+  @Test
+  public void testDeserializeFromJson()
+      throws IOException {
+    AdhocTaskConfig adhocTaskConfig = new AdhocTaskConfig("SegmentGenerationAndPushTask", "myTable", "myTask-0",
+        ImmutableMap.of("inputDirURI", "s3://my-bucket/my-file.json"));
+    adhocTaskConfig = JsonUtils.stringToObject(JsonUtils.objectToString(adhocTaskConfig), AdhocTaskConfig.class);
+    assertEquals(adhocTaskConfig.getTaskType(), "SegmentGenerationAndPushTask");
+    assertEquals(adhocTaskConfig.getTableName(), "myTable");
+    assertEquals(adhocTaskConfig.getTaskName(), "myTask-0");
+    assertEquals(adhocTaskConfig.getTaskConfigs().size(), 1);
+    assertEquals(adhocTaskConfig.getTaskConfigs().get("inputDirURI"), "s3://my-bucket/my-file.json");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org