You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/04/05 08:27:14 UTC

[GitHub] [pinot] xiangfu0 opened a new pull request, #8465: Add adhoc minion task creation endpoint

xiangfu0 opened a new pull request, #8465:
URL: https://github.com/apache/pinot/pull/8465

   ## Description
   1. Add Adhoc minion task creation endpoint 
   2. Add the implementation of SegmentGenerationAndPushTask for Adhoc task creation
   
   ## Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   * [ ] Yes (Please label as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR fix a zero-downtime upgrade introduced earlier?
   * [ ] Yes (Please label this as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR otherwise need attention when creating release notes? Things to consider:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   * [ ] Yes (Please label this PR as **<code>release-notes</code>** and complete the section on Release Notes)
   ## Release Notes
   <!-- If you have tagged this as either backward-incompat or release-notes,
   you MUST add text here that you would like to see appear in release notes of the
   next release. -->
   
   <!-- If you have a series of commits adding or enabling a feature, then
   add this section only in final commit that marks the feature completed.
   Refer to earlier release notes to see examples of text.
   -->
   ## Documentation
   <!-- If you have introduced a new feature or configuration, please add it to the documentation as well.
   See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848692810


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();

Review Comment:
   The response returns a table name to a list of task names mapping.
   E.g. if the input directory is empty or the files are filtered out then this should be an empty 2xx response with tableName to empty list mapping. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848721210


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();

Review Comment:
   Updated the logic to skip the empty list of tasks, and eventually throw an exception if the table to parent task name mapping is empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 merged pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 merged PR #8465:
URL: https://github.com/apache/pinot/pull/8465


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r846458767


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")
+  @Authenticate(AccessType.READ)
+  @ApiOperation("Read all tasks status")
+  public Map<String, TaskState> getAllTasks() {
+    Map<String, TaskState> taskStates = new HashMap<>();
+    _pinotHelixTaskResourceManager.getTaskTypes()
+        .forEach(taskType -> taskStates.putAll(_pinotHelixTaskResourceManager.getTaskStates(taskType)));
+    return taskStates;
+  }
+
+  @POST
+  @Path("/tasks/create")
+  @Authenticate(AccessType.CREATE)

Review Comment:
   Updated with a AdhocTaskConfig



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843193157


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")

Review Comment:
   We don't have one API to show all the task states, across all task types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843341607


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")

Review Comment:
   The goal eventually is to convert those API responses to an in-memory table and you can run the below queries to see job status.
   
   ```
   SHOW LOAD;
   SHOW LOAD WHERE taskName = 'mytask-01';
   SHOW LOAD WHERE state = 'CANCELLED';
   SHOW LOAD WHERE state = 'LOADING';
   ```
   
   I feel we may need a better data structure to hold the task status, e.g. job progress(12/35 COMPLETED, 5/35 IN_PROGRESS ), exception: xxxxx, time usage: 12mins, etc.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r847830168


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java:
##########
@@ -45,6 +46,12 @@ public interface PinotTaskGenerator {
    */
   List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
 
+  /**
+   * Generates a list of adhoc tasks to schedule based on the given table configs and task configs.
+   */
+  List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)

Review Comment:
   changed.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -68,4 +72,11 @@ public int getNumConcurrentTasksPerInstance() {
     }
     return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
   }
+
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    throw new UnsupportedOperationException(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r847829828


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -27,8 +27,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
+import javax.ws.rs.NotFoundException;

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #8465:
URL: https://github.com/apache/pinot/pull/8465#issuecomment-1089112687

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8465?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#8465](https://codecov.io/gh/apache/pinot/pull/8465?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8691cf3) into [master](https://codecov.io/gh/apache/pinot/commit/5e56b9a155b9f5d7ba3cd7214c15b3f1c697a351?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5e56b9a) will **decrease** coverage by `6.60%`.
   > The diff coverage is `0.00%`.
   
   > :exclamation: Current head 8691cf3 differs from pull request most recent head 471168c. Consider uploading reports for the commit 471168c to get more accurate results
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #8465      +/-   ##
   ============================================
   - Coverage     70.70%   64.10%   -6.61%     
   + Complexity     4285     4281       -4     
   ============================================
     Files          1664     1619      -45     
     Lines         87342    85532    -1810     
     Branches      13227    13042     -185     
   ============================================
   - Hits          61757    54832    -6925     
   - Misses        21296    26710    +5414     
   + Partials       4289     3990     -299     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `67.03% <ø> (-0.07%)` | :arrow_down: |
   | unittests2 | `14.12% <0.00%> (-0.05%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/8465?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...roller/api/resources/PinotTaskRestletResource.java](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Bpbm90VGFza1Jlc3RsZXRSZXNvdXJjZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...lix/core/minion/PinotHelixTaskResourceManager.java](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9QaW5vdEhlbGl4VGFza1Jlc291cmNlTWFuYWdlci5qYXZh) | `2.62% <0.00%> (-41.57%)` | :arrow_down: |
   | [...controller/helix/core/minion/PinotTaskManager.java](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9QaW5vdFRhc2tNYW5hZ2VyLmphdmE=) | `40.19% <0.00%> (-27.33%)` | :arrow_down: |
   | [...helix/core/minion/generator/BaseTaskGenerator.java](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9nZW5lcmF0b3IvQmFzZVRhc2tHZW5lcmF0b3IuamF2YQ==) | `54.54% <0.00%> (-35.46%)` | :arrow_down: |
   | [...elix/core/minion/generator/PinotTaskGenerator.java](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9nZW5lcmF0b3IvUGlub3RUYXNrR2VuZXJhdG9yLmphdmE=) | `0.00% <ø> (-33.34%)` | :arrow_down: |
   | [...che/pinot/controller/util/FileIngestionHelper.java](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci91dGlsL0ZpbGVJbmdlc3Rpb25IZWxwZXIuamF2YQ==) | `91.02% <ø> (ø)` | |
   | [...nandpush/SegmentGenerationAndPushTaskExecutor.java](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3Mvc2VnbWVudGdlbmVyYXRpb25hbmRwdXNoL1NlZ21lbnRHZW5lcmF0aW9uQW5kUHVzaFRhc2tFeGVjdXRvci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...andpush/SegmentGenerationAndPushTaskGenerator.java](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1taW5pb24tdGFza3MvcGlub3QtbWluaW9uLWJ1aWx0aW4tdGFza3Mvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9taW5pb24vdGFza3Mvc2VnbWVudGdlbmVyYXRpb25hbmRwdXNoL1NlZ21lbnRHZW5lcmF0aW9uQW5kUHVzaFRhc2tHZW5lcmF0b3IuamF2YQ==) | `2.16% <0.00%> (-0.78%)` | :arrow_down: |
   | [...va/org/apache/pinot/core/routing/RoutingTable.java](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9yb3V0aW5nL1JvdXRpbmdUYWJsZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/pinot/common/config/NettyConfig.java](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL05ldHR5Q29uZmlnLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [381 more](https://codecov.io/gh/apache/pinot/pull/8465/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8465?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8465?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [5e56b9a...471168c](https://codecov.io/gh/apache/pinot/pull/8465?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843359033


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")
+  @Authenticate(AccessType.READ)
+  @ApiOperation("Read all tasks status")
+  public Map<String, TaskState> getAllTasks() {
+    Map<String, TaskState> taskStates = new HashMap<>();
+    _pinotHelixTaskResourceManager.getTaskTypes()
+        .forEach(taskType -> taskStates.putAll(_pinotHelixTaskResourceManager.getTaskStates(taskType)));
+    return taskStates;
+  }
+
+  @POST
+  @Path("/tasks/create")

Review Comment:
   sure, will change to `"/tasks/{taskType}/{tableName}/{taskName}/execute"`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
klsince commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843185515


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")

Review Comment:
   when this API would be used? I see `/tasks/{taskType}/taskstates` is there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848694288


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();

Review Comment:
   right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843349065


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - " + taskType);
+    }
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType());
+      List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateAdhocTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        throw new NotFoundException("No task config has been generated");

Review Comment:
   Will change to return empty for this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843360981


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")
+  @Authenticate(AccessType.READ)
+  @ApiOperation("Read all tasks status")
+  public Map<String, TaskState> getAllTasks() {
+    Map<String, TaskState> taskStates = new HashMap<>();
+    _pinotHelixTaskResourceManager.getTaskTypes()
+        .forEach(taskType -> taskStates.putAll(_pinotHelixTaskResourceManager.getTaskStates(taskType)));
+    return taskStates;
+  }
+
+  @POST
+  @Path("/tasks/create")
+  @Authenticate(AccessType.CREATE)

Review Comment:
   Will change this method to mandate taskType, table, taskName with the extra map of configs.
   ```
   public Map<String, String> executeAdhocTask(
         @ApiParam(value = "Task type", required = true) @PathParam("taskType") String taskType,
         @ApiParam(value = "Table name", required = true) @PathParam("tableName") String tableName,
         @ApiParam(value = "taskName name is unique within a task type", required = true) @PathParam("taskName")
             String taskName, Map<String, String> taskConfigs) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848691516


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();

Review Comment:
   I see your point,  you mean if users call the REALTIME table, then it should be a 4xx response, and if users call the hybrid table name, then it will be an empty 2xx response right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r847525990


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -27,8 +27,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
+import javax.ws.rs.NotFoundException;

Review Comment:
   We have a class called TableNotFoundException. Can we use that (for consistency)? Also, it may be better not to introduce dependency on javax.ws ?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName,
+      Map<String, String> taskConfigs)
+      throws Exception {
+    if (taskName == null) {
+      taskName = tableName + "_" + UUID.randomUUID();
+      LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+    }
+    String minionInstanceTag =
+        taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    List<String> tableNameWithTypes = new ArrayList<>();
+    if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
+      String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
+        tableNameWithTypes.add(offlineTableName);
+      }
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) {
+        tableNameWithTypes.add(realtimeTableName);
+      }
+    } else {
+      if (_pinotHelixResourceManager.hasTable(tableName)) {
+        tableNameWithTypes.add(tableName);
+      }
+    }
+    if (tableNameWithTypes.isEmpty()) {
+      throw new NotFoundException("'tableName' " + tableName + " is not found");

Review Comment:
   4xx?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName,
+      Map<String, String> taskConfigs)
+      throws Exception {
+    if (taskName == null) {
+      taskName = tableName + "_" + UUID.randomUUID();
+      LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+    }
+    String minionInstanceTag =
+        taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(

Review Comment:
   I think this will result in a 5xx exception right? It should throw a 4xx to the caller of `/tasks/execute`
   
   Similar concerns with some of the other exceptions below. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java:
##########
@@ -45,6 +46,12 @@ public interface PinotTaskGenerator {
    */
   List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs);
 
+  /**
+   * Generates a list of adhoc tasks to schedule based on the given table configs and task configs.
+   */
+  List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)

Review Comment:
   can we just call the method `generateTasks`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName,
+      Map<String, String> taskConfigs)
+      throws Exception {
+    if (taskName == null) {
+      taskName = tableName + "_" + UUID.randomUUID();
+      LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+    }
+    String minionInstanceTag =
+        taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    List<String> tableNameWithTypes = new ArrayList<>();
+    if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
+      String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
+        tableNameWithTypes.add(offlineTableName);
+      }
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) {
+        tableNameWithTypes.add(realtimeTableName);
+      }
+    } else {
+      if (_pinotHelixResourceManager.hasTable(tableName)) {
+        tableNameWithTypes.add(tableName);
+      }
+    }
+    if (tableNameWithTypes.isEmpty()) {
+      throw new NotFoundException("'tableName' " + tableName + " is not found");
+    }
+
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator == null) {
+      throw new UnsupportedOperationException(

Review Comment:
   4xx



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -347,7 +368,11 @@ public synchronized Set<String> getTasksInProgress(String taskType) {
    */
   public synchronized TaskState getTaskState(String taskName) {
     String taskType = getTaskType(taskName);
-    return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {

Review Comment:
   Not sure if you did this because of a bug or just to cover the _ad hoc_ task functionality. But if we could return null from here, then we should also look to the callers to make sure they handle null. Mark the return value nullable, and verify that the callers handle null.
   
   Or, maybe we can return `NOT_STARTED` ?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -611,6 +636,10 @@ private static String getTaskType(String name) {
     return name.split(TASK_NAME_SEPARATOR)[1];
   }
 
+  public String getParentTaskName(String taskType, String taskName) {
+    return TASK_PREFIX + taskType + TASK_NAME_SEPARATOR + taskName;

Review Comment:
   Is there any assumption anywhere in the split code that the `taskName` is a timestamp?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();

Review Comment:
   Should we not throw an exception to return this error to the caller of the `/tasks/execute` API?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -68,4 +72,11 @@ public int getNumConcurrentTasksPerInstance() {
     }
     return JobConfig.DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
   }
+
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    throw new UnsupportedOperationException(

Review Comment:
   4xx?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();

Review Comment:
   should we not propagate this error to the caller of the API?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java:
##########
@@ -176,7 +176,7 @@ public static void copyURIToLocal(Map<String, String> batchConfigMap, URI source
     if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
       PinotFSFactory.register(sourceFileURIScheme, batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
           IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix(
-              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX)));
+              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR)));

Review Comment:
   Isnt it better to keep this in a separate PR ? (that way, when we cut releases, it is easy to identify a bug fix vs feature)



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+      }
+      updateRecordReaderConfigs(batchConfigMap);
+
+      List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+      LOGGER.info("Final input files for task config generation: {}", inputFileURIs);

Review Comment:
   We should truncate the list of URIs if the list is too long.



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java:
##########
@@ -305,8 +303,8 @@ protected SegmentGenerationTaskSpec generateTaskSpec(Map<String, String> taskCon
     }
     SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
     segmentNameGeneratorSpec.setType(taskConfigs.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE));
-    segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils
-        .getConfigMapWithPrefix(taskConfigs, BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX));
+    segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils.getConfigMapWithPrefix(taskConfigs,
+        BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR));

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848721783


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();

Review Comment:
   The outer call will handle the empty list by not putting it into the results mapping



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843346735


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {

Review Comment:
   make sense, will support offline/realtime/hybrid here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843361324


##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java:
##########
@@ -176,7 +176,7 @@ public static void copyURIToLocal(Map<String, String> batchConfigMap, URI source
     if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
       PinotFSFactory.register(sourceFileURIScheme, batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
           IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix(
-              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX)));
+              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR)));

Review Comment:
   This is a bug identified during my s3 config test.
   All generated input fs props are started with DOT_SEPARATOR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843277312


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")
+  @Authenticate(AccessType.READ)
+  @ApiOperation("Read all tasks status")
+  public Map<String, TaskState> getAllTasks() {
+    Map<String, TaskState> taskStates = new HashMap<>();
+    _pinotHelixTaskResourceManager.getTaskTypes()
+        .forEach(taskType -> taskStates.putAll(_pinotHelixTaskResourceManager.getTaskStates(taskType)));
+    return taskStates;
+  }
+
+  @POST
+  @Path("/tasks/create")
+  @Authenticate(AccessType.CREATE)

Review Comment:
   Instead of taking a generic map, can we add specific arguments here? The arguments can be self-documented instead of the user learning one argument at a time based on the exception thrown.
   
   Or, maybe this can be a table API (since we always want to run tasks on a particular table)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")
+  @Authenticate(AccessType.READ)
+  @ApiOperation("Read all tasks status")
+  public Map<String, TaskState> getAllTasks() {
+    Map<String, TaskState> taskStates = new HashMap<>();
+    _pinotHelixTaskResourceManager.getTaskTypes()
+        .forEach(taskType -> taskStates.putAll(_pinotHelixTaskResourceManager.getTaskStates(taskType)));
+    return taskStates;
+  }
+
+  @POST
+  @Path("/tasks/create")

Review Comment:
   I think what you want to do is to {{run}} a specific task type that has already been defined. Correct? If so, can we change the API to call it "run" (or "execute")? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")

Review Comment:
   Can we do that in the client? (i.e. fetch all tasks and then iterate through the types to get status)?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {

Review Comment:
   Why this limitation ? Let the task decide if it is able to work on realtime or not



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java:
##########
@@ -176,7 +176,7 @@ public static void copyURIToLocal(Map<String, String> batchConfigMap, URI source
     if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
       PinotFSFactory.register(sourceFileURIScheme, batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
           IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix(
-              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX)));
+              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR)));

Review Comment:
   How is this related to adding an adhoc task api?  It is better to keep bug fixes independent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
klsince commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843229599


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - " + taskType);
+    }
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType());
+      List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateAdhocTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        throw new NotFoundException("No task config has been generated");
+      }
+      LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs);
+      _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, 1);

Review Comment:
   use a specific meter for submitting ad-hoc tasks? 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,78 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> taskConfigsFromTable =
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      if (taskConfigsFromTable != null) {
+        taskConfigsFromTable.entrySet().forEach(entry -> taskConfigs.putIfAbsent(entry.getKey(), entry.getValue()));
+      }
+    }
+    Map<String, String> batchConfigMap = new HashMap<>(taskConfigs);
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+      }
+      updateRecordReaderConfigs(batchConfigMap);
+
+      List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+      LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
+      for (URI inputFileURI : inputFileURIs) {
+        Map<String, String> singleFileGenerationTaskConfig =
+            getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI,
+                generateFixedSegmentName(offlineTableName, taskUUID, tableNumTasks));
+        pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+            singleFileGenerationTaskConfig));
+        tableNumTasks++;
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+        updateRecordReaderConfigs(batchConfigMap);
+      }
+      return pinotTaskConfigs;
+    } catch (Exception e) {
+      LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]",
+          tableConfig, taskConfigs, e);
+      throw e;
+    }
+  }
+
+  private String generateFixedSegmentName(String offlineTableName, String taskUUID, int tableNumTasks) {
+    return String.format("%s_%s_%d", offlineTableName, taskUUID, tableNumTasks);
+  }
+
+  private String extractFormatFromFileSuffix(String path) {
+    String fileExtension = path.substring(path.lastIndexOf(".") + 1);
+    switch (fileExtension) {
+      case "":
+        throw new UnsupportedOperationException("No file extension found");

Review Comment:
   nit: use `if(StringUtils.isEmpty())`for simplicity? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - " + taskType);
+    }
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType());

Review Comment:
   nit: "... to create tasks of type:" {}



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {

Review Comment:
   can do this check at the beginning to bail out sooner



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,78 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> taskConfigsFromTable =
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      if (taskConfigsFromTable != null) {
+        taskConfigsFromTable.entrySet().forEach(entry -> taskConfigs.putIfAbsent(entry.getKey(), entry.getValue()));

Review Comment:
   should user-provided task configs overwrite those from TableConfig instead? but anyway, perhaps leave a comment about merging the configs.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - " + taskType);
+    }
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType());
+      List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateAdhocTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        throw new NotFoundException("No task config has been generated");
+      }
+      LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs);
+      _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, 1);
+      return _helixTaskResourceManager.submitTask(parentTaskName, pinotTaskConfigs,
+          CommonConstants.Helix.UNTAGGED_MINION_INSTANCE, taskGenerator.getTaskTimeoutMs(),
+          taskGenerator.getNumConcurrentTasksPerInstance());
+    }
+    throw new UnsupportedOperationException(

Review Comment:
   nit: to save some indents
   ```
   if (taskGenerator == null) {
     throw new ...
   } 
   ... the logic to submit adhoc tasks ...
   ```



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,78 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> taskConfigsFromTable =
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      if (taskConfigsFromTable != null) {
+        taskConfigsFromTable.entrySet().forEach(entry -> taskConfigs.putIfAbsent(entry.getKey(), entry.getValue()));
+      }
+    }
+    Map<String, String> batchConfigMap = new HashMap<>(taskConfigs);
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+      }
+      updateRecordReaderConfigs(batchConfigMap);
+
+      List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+      LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
+      for (URI inputFileURI : inputFileURIs) {
+        Map<String, String> singleFileGenerationTaskConfig =
+            getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI,
+                generateFixedSegmentName(offlineTableName, taskUUID, tableNumTasks));

Review Comment:
   perhaps fallback to `fixed` generator, when no segment name generator is configured from task configs, instead of fixing on 'fixed' generator here? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - " + taskType);
+    }
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType());
+      List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateAdhocTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        throw new NotFoundException("No task config has been generated");

Review Comment:
   how about `return null` like the scheduleTask method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848696669


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();

Review Comment:
   I see. Then I think the task gen call can return null, and the external will skip pushing to the map. And eventually, this call will throw an exception if the response map is empty(which means no task gen runs successfully for any table)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r847830015


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName,
+      Map<String, String> taskConfigs)
+      throws Exception {
+    if (taskName == null) {
+      taskName = tableName + "_" + UUID.randomUUID();
+      LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+    }
+    String minionInstanceTag =
+        taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(

Review Comment:
   added a few exceptions to handle those cases and throw 4xx error code.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName,
+      Map<String, String> taskConfigs)
+      throws Exception {
+    if (taskName == null) {
+      taskName = tableName + "_" + UUID.randomUUID();
+      LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+    }
+    String minionInstanceTag =
+        taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    List<String> tableNameWithTypes = new ArrayList<>();
+    if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
+      String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
+        tableNameWithTypes.add(offlineTableName);
+      }
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) {
+        tableNameWithTypes.add(realtimeTableName);
+      }
+    } else {
+      if (_pinotHelixResourceManager.hasTable(tableName)) {
+        tableNameWithTypes.add(tableName);
+      }
+    }
+    if (tableNameWithTypes.isEmpty()) {
+      throw new NotFoundException("'tableName' " + tableName + " is not found");

Review Comment:
   done



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,68 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public Map<String, String> createTask(String taskType, String tableName, @Nullable String taskName,
+      Map<String, String> taskConfigs)
+      throws Exception {
+    if (taskName == null) {
+      taskName = tableName + "_" + UUID.randomUUID();
+      LOGGER.info("Task name is missing, auto-generate one: {}", taskName);
+    }
+    String minionInstanceTag =
+        taskConfigs.getOrDefault("minionInstanceTag", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    List<String> tableNameWithTypes = new ArrayList<>();
+    if (TableNameBuilder.getTableTypeFromTableName(tableName) == null) {
+      String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasOfflineTable(offlineTableName)) {
+        tableNameWithTypes.add(offlineTableName);
+      }
+      String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      if (_pinotHelixResourceManager.hasRealtimeTable(realtimeTableName)) {
+        tableNameWithTypes.add(realtimeTableName);
+      }
+    } else {
+      if (_pinotHelixResourceManager.hasTable(tableName)) {
+        tableNameWithTypes.add(tableName);
+      }
+    }
+    if (tableNameWithTypes.isEmpty()) {
+      throw new NotFoundException("'tableName' " + tableName + " is not found");
+    }
+
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator == null) {
+      throw new UnsupportedOperationException(

Review Comment:
   done
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r847822790


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();

Review Comment:
   this is not an exception, the external caller will call this twice for realtime/offline table separately.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -347,7 +368,11 @@ public synchronized Set<String> getTasksInProgress(String taskType) {
    */
   public synchronized TaskState getTaskState(String taskName) {
     String taskType = getTaskType(taskName);
-    return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {

Review Comment:
   In the current code, Pinot set up a job queue for the first task schedule, which means if you are querying a task type that hasn't registered, then this call will throw NPE.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848697013


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -347,7 +368,11 @@ public synchronized Set<String> getTasksInProgress(String taskType) {
    */
   public synchronized TaskState getTaskState(String taskName) {
     String taskType = getTaskType(taskName);
-    return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {

Review Comment:
   Right, and we have periodic tasks calling this API I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on PR #8465:
URL: https://github.com/apache/pinot/pull/8465#issuecomment-1089067078

   > Please give me a couple of days, I would like to review this.
   > 
   > Meanwhile, if you can please mention some use cases I would appreciate it.
   
   Thanks, @mcvsubbu, In short, the goal here is to add a new public endpoint to schedule ad-hoc minion tasks.
   
   Current minion tasks require all tasks configs to be part of table configs, it makes sense for regularly scheduled jobs, but when I want to quickly test something, it's hard for me to bring in data using minion.
   
   Once this feature is in place, we can also implement the feature of SQL `INSERT INTO` with raw files in the query console to simplify the ingestion process. 
   
   E.g.
   ```
   INSERT INTO [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name;
   ```
   or 
   ```
   LOAD LABEL [db.]task (
        DATA INFILE("s3a://my-s3-bucket/*/*.parquet") INTO TABLE `table`
   )
   WITH MINION minionTag (
     "fs.s3.accessKeyId" = "<my-access-key>",
     "fs.s3.accessKeySecret" = "<my-secret-key>"
   );
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843361324


##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java:
##########
@@ -176,7 +176,7 @@ public static void copyURIToLocal(Map<String, String> batchConfigMap, URI source
     if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
       PinotFSFactory.register(sourceFileURIScheme, batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
           IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix(
-              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX)));
+              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR)));

Review Comment:
   this is a bug identified during the code copy.
   It generated the props all started with DOT_SEPARATOR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843359033


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")
+  @Authenticate(AccessType.READ)
+  @ApiOperation("Read all tasks status")
+  public Map<String, TaskState> getAllTasks() {
+    Map<String, TaskState> taskStates = new HashMap<>();
+    _pinotHelixTaskResourceManager.getTaskTypes()
+        .forEach(taskType -> taskStates.putAll(_pinotHelixTaskResourceManager.getTaskStates(taskType)));
+    return taskStates;
+  }
+
+  @POST
+  @Path("/tasks/create")

Review Comment:
   sure, will change to `/tasks/execute`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on PR #8465:
URL: https://github.com/apache/pinot/pull/8465#issuecomment-1098494069

   > Minor change suggested. thank you
   
   Thanks for your comments!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] klsince commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
klsince commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843444619


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")
+  @Authenticate(AccessType.READ)
+  @ApiOperation("Read all tasks status")
+  public Map<String, TaskState> getAllTasks() {
+    Map<String, TaskState> taskStates = new HashMap<>();
+    _pinotHelixTaskResourceManager.getTaskTypes()
+        .forEach(taskType -> taskStates.putAll(_pinotHelixTaskResourceManager.getTaskStates(taskType)));
+    return taskStates;
+  }
+
+  @POST
+  @Path("/tasks/create")
+  @Authenticate(AccessType.CREATE)

Review Comment:
   maybe make `taskName` an optional @QueryParam. And auto-gen a task name if not provided by user. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843193157


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")

Review Comment:
   We don't have one API to show all the task states.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on PR #8465:
URL: https://github.com/apache/pinot/pull/8465#issuecomment-1089024664

   Please give me a couple of days, I would like to review this.
   
   Meanwhile, if you can please mention some use cases I would appreciate it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r846383207


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")
+  @Authenticate(AccessType.READ)
+  @ApiOperation("Read all tasks status")
+  public Map<String, TaskState> getAllTasks() {
+    Map<String, TaskState> taskStates = new HashMap<>();
+    _pinotHelixTaskResourceManager.getTaskTypes()
+        .forEach(taskType -> taskStates.putAll(_pinotHelixTaskResourceManager.getTaskStates(taskType)));
+    return taskStates;
+  }
+
+  @POST
+  @Path("/tasks/create")
+  @Authenticate(AccessType.CREATE)

Review Comment:
   I'd suggest adding a JSON serializable wrapper class to wrap all the arguments and the config map. See `Instance` as an example. The API can be as simple as `POST /tasks/execute`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on PR #8465:
URL: https://github.com/apache/pinot/pull/8465#issuecomment-1092180679

   @mcvsubbu can you take a look again ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848727357


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -347,7 +368,11 @@ public synchronized Set<String> getTasksInProgress(String taskType) {
    */
   public synchronized TaskState getTaskState(String taskName) {
     String taskType = getTaskType(taskName);
-    return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {

Review Comment:
   The periodic call won't help, you have the same check at line 390(https://github.com/apache/pinot/pull/8465/files#diff-1ff7c17d681951c55f3f7b0540609805063b2f9ffa90df78a93e440d85445d34R390) as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r849840870


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -366,6 +379,33 @@ public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryPa
     }
   }
 
+  @POST
+  @Path("/tasks/execute")
+  @Authenticate(AccessType.CREATE)
+  @ApiOperation("Create an adhoc task to be running on minion")

Review Comment:
   ```suggestion
     @ApiOperation("Execute a task on minion")
   ```
   ```suggestion
     @ApiOperation("Create an adhoc task to be running on minion")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r847829124


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskExecutor.java:
##########
@@ -305,8 +303,8 @@ protected SegmentGenerationTaskSpec generateTaskSpec(Map<String, String> taskCon
     }
     SegmentNameGeneratorSpec segmentNameGeneratorSpec = new SegmentNameGeneratorSpec();
     segmentNameGeneratorSpec.setType(taskConfigs.get(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE));
-    segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils
-        .getConfigMapWithPrefix(taskConfigs, BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX));
+    segmentNameGeneratorSpec.setConfigs(IngestionConfigUtils.getConfigMapWithPrefix(taskConfigs,
+        BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR));

Review Comment:
   https://github.com/apache/pinot/pull/8511



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java:
##########
@@ -176,7 +176,7 @@ public static void copyURIToLocal(Map<String, String> batchConfigMap, URI source
     if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
       PinotFSFactory.register(sourceFileURIScheme, batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
           IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix(
-              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX)));
+              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR)));

Review Comment:
   https://github.com/apache/pinot/pull/8511



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848692810


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();

Review Comment:
   The response returns a table name to a list of task names mapping.
   E.g. if the input directory is empty or the files are filtered out then this should be an empty 2xx response.
   With tableName to empty list mapping. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848652473


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();

Review Comment:
   Same as previous case. What does the rest request return, and how will the caller differentiate between "no input files" (presumably an error condition) and "no table of this table type"?
   
   Perhaps one way is to throw specific exceptions and field them in the API so that the right messages can be returned to the caller (with the right http code).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843354424


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,78 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> taskConfigsFromTable =
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      if (taskConfigsFromTable != null) {
+        taskConfigsFromTable.entrySet().forEach(entry -> taskConfigs.putIfAbsent(entry.getKey(), entry.getValue()));

Review Comment:
   my thought is to take the task config from the table as the base, then apply the changes from ad-hoc task config.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - " + taskType);
+    }
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType());
+      List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateAdhocTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        throw new NotFoundException("No task config has been generated");
+      }
+      LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs);
+      _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, 1);
+      return _helixTaskResourceManager.submitTask(parentTaskName, pinotTaskConfigs,
+          CommonConstants.Helix.UNTAGGED_MINION_INSTANCE, taskGenerator.getTaskTimeoutMs(),
+          taskGenerator.getNumConcurrentTasksPerInstance());
+    }
+    throw new UnsupportedOperationException(

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843341911


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")

Review Comment:
   I will remove this API for now, we can add it later on with a better understanding of the task information.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843361324


##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java:
##########
@@ -176,7 +176,7 @@ public static void copyURIToLocal(Map<String, String> batchConfigMap, URI source
     if (!PinotFSFactory.isSchemeSupported(sourceFileURIScheme)) {
       PinotFSFactory.register(sourceFileURIScheme, batchConfigMap.get(BatchConfigProperties.INPUT_FS_CLASS),
           IngestionConfigUtils.getInputFsProps(IngestionConfigUtils.getConfigMapWithPrefix(
-              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX)));
+              batchConfigMap, BatchConfigProperties.INPUT_FS_PROP_PREFIX + IngestionConfigUtils.DOT_SEPARATOR)));

Review Comment:
   This is a bug identified during my s3 config test.
   It generated the props all started with DOT_SEPARATOR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843358917


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,78 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> taskConfigsFromTable =
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      if (taskConfigsFromTable != null) {
+        taskConfigsFromTable.entrySet().forEach(entry -> taskConfigs.putIfAbsent(entry.getKey(), entry.getValue()));
+      }
+    }
+    Map<String, String> batchConfigMap = new HashMap<>(taskConfigs);
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+      }
+      updateRecordReaderConfigs(batchConfigMap);
+
+      List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+      LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
+      for (URI inputFileURI : inputFileURIs) {
+        Map<String, String> singleFileGenerationTaskConfig =
+            getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI,
+                generateFixedSegmentName(offlineTableName, taskUUID, tableNumTasks));

Review Comment:
   make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843359033


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -346,6 +347,29 @@ public StringResultResponse getTaskStateDeprecated(
     }
   }
 
+  @GET
+  @Path("/tasks/all")
+  @Authenticate(AccessType.READ)
+  @ApiOperation("Read all tasks status")
+  public Map<String, TaskState> getAllTasks() {
+    Map<String, TaskState> taskStates = new HashMap<>();
+    _pinotHelixTaskResourceManager.getTaskTypes()
+        .forEach(taskType -> taskStates.putAll(_pinotHelixTaskResourceManager.getTaskStates(taskType)));
+    return taskStates;
+  }
+
+  @POST
+  @Path("/tasks/create")

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848650909


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();

Review Comment:
   I am assuming that the external caller will just get an empty list of tasks in response,and perhaps a 2xx response. I am wondering if that will result in a support call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r847829513


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();

Review Comment:
   This is for internal logging, no propagate. Do you mean we should expose this to client ?



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,76 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    // Override task configs from table with adhoc task configs.
+    Map<String, String> batchConfigMap = new HashMap<>();
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      batchConfigMap.putAll(
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE));
+    }
+    batchConfigMap.putAll(taskConfigs);
+
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+      }
+      updateRecordReaderConfigs(batchConfigMap);
+
+      List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+      LOGGER.info("Final input files for task config generation: {}", inputFileURIs);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r847823265


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -611,6 +636,10 @@ private static String getTaskType(String name) {
     return name.split(TASK_NAME_SEPARATOR)[1];
   }
 
+  public String getParentTaskName(String taskType, String taskName) {
+    return TASK_PREFIX + taskType + TASK_NAME_SEPARATOR + taskName;

Review Comment:
   no need, this is just to generate a random task name if it doesn't exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r848727357


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -347,7 +368,11 @@ public synchronized Set<String> getTasksInProgress(String taskType) {
    */
   public synchronized TaskState getTaskState(String taskName) {
     String taskType = getTaskType(taskName);
-    return _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobState(getHelixJobName(taskName));
+    WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {

Review Comment:
   The periodic call won't help, see the same check at line 390(https://github.com/apache/pinot/pull/8465/files#diff-1ff7c17d681951c55f3f7b0540609805063b2f9ffa90df78a93e440d85445d34R390) as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843356509


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segmentgenerationandpush/SegmentGenerationAndPushTaskGenerator.java:
##########
@@ -177,6 +180,78 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  @Override
+  public List<PinotTaskConfig> generateAdhocTasks(TableConfig tableConfig, Map<String, String> taskConfigs)
+      throws Exception {
+    String taskUUID = UUID.randomUUID().toString();
+    // Only generate tasks for OFFLINE tables
+    String offlineTableName = tableConfig.getTableName();
+    if (tableConfig.getTableType() != TableType.OFFLINE) {
+      LOGGER.warn("Skip generating SegmentGenerationAndPushTask for non-OFFLINE table: {}", offlineTableName);
+      return ImmutableList.of();
+    }
+
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> taskConfigsFromTable =
+          tableTaskConfig.getConfigsForTaskType(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE);
+      if (taskConfigsFromTable != null) {
+        taskConfigsFromTable.entrySet().forEach(entry -> taskConfigs.putIfAbsent(entry.getKey(), entry.getValue()));
+      }
+    }
+    Map<String, String> batchConfigMap = new HashMap<>(taskConfigs);
+    int tableNumTasks = 0;
+    try {
+      URI inputDirURI =
+          SegmentGenerationUtils.getDirectoryURI(batchConfigMap.get(BatchConfigProperties.INPUT_DIR_URI));
+      List<URI> inputFileURIs = getInputFilesFromDirectory(batchConfigMap, inputDirURI, Collections.emptySet());
+      if (inputFileURIs.isEmpty()) {
+        LOGGER.warn("Skip generating SegmentGenerationAndPushTask, no input files found : {}", inputDirURI);
+        return ImmutableList.of();
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+      }
+      updateRecordReaderConfigs(batchConfigMap);
+
+      List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+      LOGGER.info("Final input files for task config generation: {}", inputFileURIs);
+      for (URI inputFileURI : inputFileURIs) {
+        Map<String, String> singleFileGenerationTaskConfig =
+            getSingleFileGenerationTaskConfig(offlineTableName, tableNumTasks, batchConfigMap, inputFileURI,
+                generateFixedSegmentName(offlineTableName, taskUUID, tableNumTasks));
+        pinotTaskConfigs.add(new PinotTaskConfig(MinionConstants.SegmentGenerationAndPushTask.TASK_TYPE,
+            singleFileGenerationTaskConfig));
+        tableNumTasks++;
+      }
+      if (!batchConfigMap.containsKey(BatchConfigProperties.INPUT_FORMAT)) {
+        batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT,
+            extractFormatFromFileSuffix(inputFileURIs.get(0).getPath()));
+        updateRecordReaderConfigs(batchConfigMap);
+      }
+      return pinotTaskConfigs;
+    } catch (Exception e) {
+      LOGGER.error("Unable to generate the SegmentGenerationAndPush task. [ table configs: {}, task configs: {} ]",
+          tableConfig, taskConfigs, e);
+      throw e;
+    }
+  }
+
+  private String generateFixedSegmentName(String offlineTableName, String taskUUID, int tableNumTasks) {
+    return String.format("%s_%s_%d", offlineTableName, taskUUID, tableNumTasks);
+  }
+
+  private String extractFormatFromFileSuffix(String path) {
+    String fileExtension = path.substring(path.lastIndexOf(".") + 1);
+    switch (fileExtension) {
+      case "":
+        throw new UnsupportedOperationException("No file extension found");

Review Comment:
   will change the logic to if check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r843347860


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - " + taskType);
+    }
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType());

Review Comment:
   will do



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -128,6 +132,53 @@ public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
     }
   }
 
+  public String createTask(Map<String, String> taskConfigs)
+      throws Exception {
+    String taskName = taskConfigs.get("taskName");
+    if (taskName == null) {
+      throw new IllegalArgumentException("Missing field 'taskName'");
+    }
+    String taskType = taskConfigs.get("taskType");
+    if (taskType == null) {
+      throw new IllegalArgumentException("Missing field 'taskType'");
+    }
+    String tableName = taskConfigs.get("tableName");
+    if (tableName == null) {
+      throw new IllegalArgumentException("Missing field 'tableName'");
+    }
+    String parentTaskName = _helixTaskResourceManager.getParentTaskName(taskType, taskName);
+    TaskState taskState = _helixTaskResourceManager.getTaskState(parentTaskName);
+    if (taskState != null) {
+      throw new RuntimeException(
+          "Task [" + taskName + "] of type [" + taskType + "] is already created. Current state is " + taskState);
+    }
+    if (TableNameBuilder.isRealtimeTableResource(tableName)) {
+      throw new UnsupportedOperationException(
+          "Realtime table: " + tableName + " is not supported for task type - " + taskType);
+    }
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(tableName));
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
+    // Generate each type of tasks
+    if (taskGenerator != null) {
+      _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+      addTaskTypeMetricsUpdaterIfNeeded(taskType);
+      LOGGER.info("Trying to create a task type: {}", taskGenerator.getTaskType());
+      List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateAdhocTasks(tableConfig, taskConfigs);
+      if (pinotTaskConfigs.isEmpty()) {
+        throw new NotFoundException("No task config has been generated");
+      }
+      LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs);
+      _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, 1);

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #8465: Add adhoc minion task creation endpoint

Posted by GitBox <gi...@apache.org>.
xiangfu0 commented on code in PR #8465:
URL: https://github.com/apache/pinot/pull/8465#discussion_r849905411


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java:
##########
@@ -366,6 +379,33 @@ public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") @QueryPa
     }
   }
 
+  @POST
+  @Path("/tasks/execute")
+  @Authenticate(AccessType.CREATE)
+  @ApiOperation("Create an adhoc task to be running on minion")

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org